Use constants.ETC_HOSTS instead of string for /etc/hosts
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge, mac) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
276   else:
277     nic_count = 0
278
279   env["INSTANCE_NIC_COUNT"] = nic_count
280
281   return env
282
283
284 def _BuildInstanceHookEnvByObject(instance, override=None):
285   """Builds instance related env variables for hooks from an object.
286
287   Args:
288     instance: objects.Instance object of instance
289     override: dict of values to override
290   """
291   args = {
292     'name': instance.name,
293     'primary_node': instance.primary_node,
294     'secondary_nodes': instance.secondary_nodes,
295     'os_type': instance.os,
296     'status': instance.os,
297     'memory': instance.memory,
298     'vcpus': instance.vcpus,
299     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
300   }
301   if override:
302     args.update(override)
303   return _BuildInstanceHookEnv(**args)
304
305
306 def _UpdateKnownHosts(fullnode, ip, pubkey):
307   """Ensure a node has a correct known_hosts entry.
308
309   Args:
310     fullnode - Fully qualified domain name of host. (str)
311     ip       - IPv4 address of host (str)
312     pubkey   - the public key of the cluster
313
314   """
315   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
316     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
317   else:
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
319
320   inthere = False
321
322   save_lines = []
323   add_lines = []
324   removed = False
325
326   for rawline in f:
327     logger.Debug('read %s' % (repr(rawline),))
328
329     parts = rawline.rstrip('\r\n').split()
330
331     # Ignore unwanted lines
332     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
333       fields = parts[0].split(',')
334       key = parts[2]
335
336       haveall = True
337       havesome = False
338       for spec in [ ip, fullnode ]:
339         if spec not in fields:
340           haveall = False
341         if spec in fields:
342           havesome = True
343
344       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
345       if haveall and key == pubkey:
346         inthere = True
347         save_lines.append(rawline)
348         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
349         continue
350
351       if havesome and (not haveall or key != pubkey):
352         removed = True
353         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
354         continue
355
356     save_lines.append(rawline)
357
358   if not inthere:
359     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
360     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
361
362   if removed:
363     save_lines = save_lines + add_lines
364
365     # Write a new file and replace old.
366     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
367                                    constants.DATA_DIR)
368     newfile = os.fdopen(fd, 'w')
369     try:
370       newfile.write(''.join(save_lines))
371     finally:
372       newfile.close()
373     logger.Debug("Wrote new known_hosts.")
374     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
375
376   elif add_lines:
377     # Simply appending a new line will do the trick.
378     f.seek(0, 2)
379     for add in add_lines:
380       f.write(add)
381
382   f.close()
383
384
385 def _HasValidVG(vglist, vgname):
386   """Checks if the volume group list is valid.
387
388   A non-None return value means there's an error, and the return value
389   is the error message.
390
391   """
392   vgsize = vglist.get(vgname, None)
393   if vgsize is None:
394     return "volume group '%s' missing" % vgname
395   elif vgsize < 20480:
396     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
397             (vgname, vgsize))
398   return None
399
400
401 def _InitSSHSetup(node):
402   """Setup the SSH configuration for the cluster.
403
404
405   This generates a dsa keypair for root, adds the pub key to the
406   permitted hosts and adds the hostkey to its own known hosts.
407
408   Args:
409     node: the name of this host as a fqdn
410
411   """
412   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
413
414   for name in priv_key, pub_key:
415     if os.path.exists(name):
416       utils.CreateBackup(name)
417     utils.RemoveFile(name)
418
419   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
420                          "-f", priv_key,
421                          "-q", "-N", ""])
422   if result.failed:
423     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
424                              result.output)
425
426   f = open(pub_key, 'r')
427   try:
428     utils.AddAuthorizedKey(auth_keys, f.read(8192))
429   finally:
430     f.close()
431
432
433 def _InitGanetiServerSetup(ss):
434   """Setup the necessary configuration for the initial node daemon.
435
436   This creates the nodepass file containing the shared password for
437   the cluster and also generates the SSL certificate.
438
439   """
440   # Create pseudo random password
441   randpass = sha.new(os.urandom(64)).hexdigest()
442   # and write it into sstore
443   ss.SetKey(ss.SS_NODED_PASS, randpass)
444
445   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
446                          "-days", str(365*5), "-nodes", "-x509",
447                          "-keyout", constants.SSL_CERT_FILE,
448                          "-out", constants.SSL_CERT_FILE, "-batch"])
449   if result.failed:
450     raise errors.OpExecError("could not generate server ssl cert, command"
451                              " %s had exitcode %s and error message %s" %
452                              (result.cmd, result.exit_code, result.output))
453
454   os.chmod(constants.SSL_CERT_FILE, 0400)
455
456   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
457
458   if result.failed:
459     raise errors.OpExecError("Could not start the node daemon, command %s"
460                              " had exitcode %s and error %s" %
461                              (result.cmd, result.exit_code, result.output))
462
463
464 def _CheckInstanceBridgesExist(instance):
465   """Check that the brigdes needed by an instance exist.
466
467   """
468   # check bridges existance
469   brlist = [nic.bridge for nic in instance.nics]
470   if not rpc.call_bridges_exist(instance.primary_node, brlist):
471     raise errors.OpPrereqError("one or more target bridges %s does not"
472                                " exist on destination node '%s'" %
473                                (brlist, instance.primary_node))
474
475
476 class LUInitCluster(LogicalUnit):
477   """Initialise the cluster.
478
479   """
480   HPATH = "cluster-init"
481   HTYPE = constants.HTYPE_CLUSTER
482   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
483               "def_bridge", "master_netdev"]
484   REQ_CLUSTER = False
485
486   def BuildHooksEnv(self):
487     """Build hooks env.
488
489     Notes: Since we don't require a cluster, we must manually add
490     ourselves in the post-run node list.
491
492     """
493     env = {"OP_TARGET": self.op.cluster_name}
494     return env, [], [self.hostname.name]
495
496   def CheckPrereq(self):
497     """Verify that the passed name is a valid one.
498
499     """
500     if config.ConfigWriter.IsCluster():
501       raise errors.OpPrereqError("Cluster is already initialised")
502
503     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
504       if not os.path.exists(constants.VNC_PASSWORD_FILE):
505         raise errors.OpPrereqError("Please prepare the cluster VNC"
506                                    "password file %s" %
507                                    constants.VNC_PASSWORD_FILE)
508
509     self.hostname = hostname = utils.HostInfo()
510
511     if hostname.ip.startswith("127."):
512       raise errors.OpPrereqError("This host's IP resolves to the private"
513                                  " range (%s). Please fix DNS or %s." %
514                                  (hostname.ip, constants.ETC_HOSTS))
515
516     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
517
518     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
519                          constants.DEFAULT_NODED_PORT):
520       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521                                  " to %s,\nbut this ip address does not"
522                                  " belong to this host."
523                                  " Aborting." % hostname.ip)
524
525     secondary_ip = getattr(self.op, "secondary_ip", None)
526     if secondary_ip and not utils.IsValidIP(secondary_ip):
527       raise errors.OpPrereqError("Invalid secondary ip given")
528     if (secondary_ip and
529         secondary_ip != hostname.ip and
530         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
531                            constants.DEFAULT_NODED_PORT))):
532       raise errors.OpPrereqError("You gave %s as secondary IP,"
533                                  " but it does not belong to this host." %
534                                  secondary_ip)
535     self.secondary_ip = secondary_ip
536
537     # checks presence of the volume group given
538     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
539
540     if vgstatus:
541       raise errors.OpPrereqError("Error: %s" % vgstatus)
542
543     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
544                     self.op.mac_prefix):
545       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
546                                  self.op.mac_prefix)
547
548     if self.op.hypervisor_type not in constants.HYPER_TYPES:
549       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
550                                  self.op.hypervisor_type)
551
552     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
553     if result.failed:
554       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
555                                  (self.op.master_netdev,
556                                   result.output.strip()))
557
558     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
559             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
560       raise errors.OpPrereqError("Init.d script '%s' missing or not"
561                                  " executable." % constants.NODE_INITD_SCRIPT)
562
563   def Exec(self, feedback_fn):
564     """Initialize the cluster.
565
566     """
567     clustername = self.clustername
568     hostname = self.hostname
569
570     # set up the simple store
571     self.sstore = ss = ssconf.SimpleStore()
572     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
573     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
574     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
575     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
576     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
577
578     # set up the inter-node password and certificate
579     _InitGanetiServerSetup(ss)
580
581     # start the master ip
582     rpc.call_node_start_master(hostname.name)
583
584     # set up ssh config and /etc/hosts
585     f = open(constants.SSH_HOST_RSA_PUB, 'r')
586     try:
587       sshline = f.read()
588     finally:
589       f.close()
590     sshkey = sshline.split(" ")[1]
591
592     _AddHostToEtcHosts(hostname.name)
593
594     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
595
596     _InitSSHSetup(hostname.name)
597
598     # init of cluster config file
599     self.cfg = cfgw = config.ConfigWriter()
600     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
601                     sshkey, self.op.mac_prefix,
602                     self.op.vg_name, self.op.def_bridge)
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signalled by raising errors.OpPrereqError.
617
618     """
619     master = self.sstore.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.sstore.GetMasterNode()
635     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
636     utils.CreateBackup(priv_key)
637     utils.CreateBackup(pub_key)
638     rpc.call_node_leave_cluster(master)
639
640
641 class LUVerifyCluster(NoHooksLU):
642   """Verifies the cluster status.
643
644   """
645   _OP_REQP = []
646
647   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
648                   remote_version, feedback_fn):
649     """Run multiple tests against a node.
650
651     Test list:
652       - compares ganeti version
653       - checks vg existance and size > 20G
654       - checks config file checksum
655       - checks ssh to other nodes
656
657     Args:
658       node: name of the node to check
659       file_list: required list of files
660       local_cksum: dictionary of local files and their checksums
661
662     """
663     # compares ganeti version
664     local_version = constants.PROTOCOL_VERSION
665     if not remote_version:
666       feedback_fn(" - ERROR: connection to %s failed" % (node))
667       return True
668
669     if local_version != remote_version:
670       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
671                       (local_version, node, remote_version))
672       return True
673
674     # checks vg existance and size > 20G
675
676     bad = False
677     if not vglist:
678       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
679                       (node,))
680       bad = True
681     else:
682       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
683       if vgstatus:
684         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
685         bad = True
686
687     # checks config file checksum
688     # checks ssh to any
689
690     if 'filelist' not in node_result:
691       bad = True
692       feedback_fn("  - ERROR: node hasn't returned file checksum data")
693     else:
694       remote_cksum = node_result['filelist']
695       for file_name in file_list:
696         if file_name not in remote_cksum:
697           bad = True
698           feedback_fn("  - ERROR: file '%s' missing" % file_name)
699         elif remote_cksum[file_name] != local_cksum[file_name]:
700           bad = True
701           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
702
703     if 'nodelist' not in node_result:
704       bad = True
705       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
706     else:
707       if node_result['nodelist']:
708         bad = True
709         for node in node_result['nodelist']:
710           feedback_fn("  - ERROR: communication with node '%s': %s" %
711                           (node, node_result['nodelist'][node]))
712     hyp_result = node_result.get('hypervisor', None)
713     if hyp_result is not None:
714       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
715     return bad
716
717   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
718     """Verify an instance.
719
720     This function checks to see if the required block devices are
721     available on the instance's node.
722
723     """
724     bad = False
725
726     instancelist = self.cfg.GetInstanceList()
727     if not instance in instancelist:
728       feedback_fn("  - ERROR: instance %s not in instance list %s" %
729                       (instance, instancelist))
730       bad = True
731
732     instanceconfig = self.cfg.GetInstanceInfo(instance)
733     node_current = instanceconfig.primary_node
734
735     node_vol_should = {}
736     instanceconfig.MapLVsByNode(node_vol_should)
737
738     for node in node_vol_should:
739       for volume in node_vol_should[node]:
740         if node not in node_vol_is or volume not in node_vol_is[node]:
741           feedback_fn("  - ERROR: volume %s missing on node %s" %
742                           (volume, node))
743           bad = True
744
745     if not instanceconfig.status == 'down':
746       if not instance in node_instance[node_current]:
747         feedback_fn("  - ERROR: instance %s not running on node %s" %
748                         (instance, node_current))
749         bad = True
750
751     for node in node_instance:
752       if (not node == node_current):
753         if instance in node_instance[node]:
754           feedback_fn("  - ERROR: instance %s should not run on node %s" %
755                           (instance, node))
756           bad = True
757
758     return bad
759
760   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
761     """Verify if there are any unknown volumes in the cluster.
762
763     The .os, .swap and backup volumes are ignored. All other volumes are
764     reported as unknown.
765
766     """
767     bad = False
768
769     for node in node_vol_is:
770       for volume in node_vol_is[node]:
771         if node not in node_vol_should or volume not in node_vol_should[node]:
772           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
773                       (volume, node))
774           bad = True
775     return bad
776
777   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
778     """Verify the list of running instances.
779
780     This checks what instances are running but unknown to the cluster.
781
782     """
783     bad = False
784     for node in node_instance:
785       for runninginstance in node_instance[node]:
786         if runninginstance not in instancelist:
787           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
788                           (runninginstance, node))
789           bad = True
790     return bad
791
792   def CheckPrereq(self):
793     """Check prerequisites.
794
795     This has no prerequisites.
796
797     """
798     pass
799
800   def Exec(self, feedback_fn):
801     """Verify integrity of cluster, performing various test on nodes.
802
803     """
804     bad = False
805     feedback_fn("* Verifying global settings")
806     for msg in self.cfg.VerifyConfig():
807       feedback_fn("  - ERROR: %s" % msg)
808
809     vg_name = self.cfg.GetVGName()
810     nodelist = utils.NiceSort(self.cfg.GetNodeList())
811     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
812     node_volume = {}
813     node_instance = {}
814
815     # FIXME: verify OS list
816     # do local checksums
817     file_names = list(self.sstore.GetFileList())
818     file_names.append(constants.SSL_CERT_FILE)
819     file_names.append(constants.CLUSTER_CONF_FILE)
820     local_checksums = utils.FingerprintFiles(file_names)
821
822     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
823     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
824     all_instanceinfo = rpc.call_instance_list(nodelist)
825     all_vglist = rpc.call_vg_list(nodelist)
826     node_verify_param = {
827       'filelist': file_names,
828       'nodelist': nodelist,
829       'hypervisor': None,
830       }
831     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
832     all_rversion = rpc.call_version(nodelist)
833
834     for node in nodelist:
835       feedback_fn("* Verifying node %s" % node)
836       result = self._VerifyNode(node, file_names, local_checksums,
837                                 all_vglist[node], all_nvinfo[node],
838                                 all_rversion[node], feedback_fn)
839       bad = bad or result
840
841       # node_volume
842       volumeinfo = all_volumeinfo[node]
843
844       if isinstance(volumeinfo, basestring):
845         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
846                     (node, volumeinfo[-400:].encode('string_escape')))
847         bad = True
848         node_volume[node] = {}
849       elif not isinstance(volumeinfo, dict):
850         feedback_fn("  - ERROR: connection to %s failed" % (node,))
851         bad = True
852         continue
853       else:
854         node_volume[node] = volumeinfo
855
856       # node_instance
857       nodeinstance = all_instanceinfo[node]
858       if type(nodeinstance) != list:
859         feedback_fn("  - ERROR: connection to %s failed" % (node,))
860         bad = True
861         continue
862
863       node_instance[node] = nodeinstance
864
865     node_vol_should = {}
866
867     for instance in instancelist:
868       feedback_fn("* Verifying instance %s" % instance)
869       result =  self._VerifyInstance(instance, node_volume, node_instance,
870                                      feedback_fn)
871       bad = bad or result
872
873       inst_config = self.cfg.GetInstanceInfo(instance)
874
875       inst_config.MapLVsByNode(node_vol_should)
876
877     feedback_fn("* Verifying orphan volumes")
878     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
879                                        feedback_fn)
880     bad = bad or result
881
882     feedback_fn("* Verifying remaining instances")
883     result = self._VerifyOrphanInstances(instancelist, node_instance,
884                                          feedback_fn)
885     bad = bad or result
886
887     return int(bad)
888
889
890 class LUVerifyDisks(NoHooksLU):
891   """Verifies the cluster disks status.
892
893   """
894   _OP_REQP = []
895
896   def CheckPrereq(self):
897     """Check prerequisites.
898
899     This has no prerequisites.
900
901     """
902     pass
903
904   def Exec(self, feedback_fn):
905     """Verify integrity of cluster disks.
906
907     """
908     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
909
910     vg_name = self.cfg.GetVGName()
911     nodes = utils.NiceSort(self.cfg.GetNodeList())
912     instances = [self.cfg.GetInstanceInfo(name)
913                  for name in self.cfg.GetInstanceList()]
914
915     nv_dict = {}
916     for inst in instances:
917       inst_lvs = {}
918       if (inst.status != "up" or
919           inst.disk_template not in constants.DTS_NET_MIRROR):
920         continue
921       inst.MapLVsByNode(inst_lvs)
922       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
923       for node, vol_list in inst_lvs.iteritems():
924         for vol in vol_list:
925           nv_dict[(node, vol)] = inst
926
927     if not nv_dict:
928       return result
929
930     node_lvs = rpc.call_volume_list(nodes, vg_name)
931
932     to_act = set()
933     for node in nodes:
934       # node_volume
935       lvs = node_lvs[node]
936
937       if isinstance(lvs, basestring):
938         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
939         res_nlvm[node] = lvs
940       elif not isinstance(lvs, dict):
941         logger.Info("connection to node %s failed or invalid data returned" %
942                     (node,))
943         res_nodes.append(node)
944         continue
945
946       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
947         inst = nv_dict.pop((node, lv_name), None)
948         if (not lv_online and inst is not None
949             and inst.name not in res_instances):
950           res_instances.append(inst.name)
951
952     # any leftover items in nv_dict are missing LVs, let's arrange the
953     # data better
954     for key, inst in nv_dict.iteritems():
955       if inst.name not in res_missing:
956         res_missing[inst.name] = []
957       res_missing[inst.name].append(key)
958
959     return result
960
961
962 class LURenameCluster(LogicalUnit):
963   """Rename the cluster.
964
965   """
966   HPATH = "cluster-rename"
967   HTYPE = constants.HTYPE_CLUSTER
968   _OP_REQP = ["name"]
969
970   def BuildHooksEnv(self):
971     """Build hooks env.
972
973     """
974     env = {
975       "OP_TARGET": self.op.sstore.GetClusterName(),
976       "NEW_NAME": self.op.name,
977       }
978     mn = self.sstore.GetMasterNode()
979     return env, [mn], [mn]
980
981   def CheckPrereq(self):
982     """Verify that the passed name is a valid one.
983
984     """
985     hostname = utils.HostInfo(self.op.name)
986
987     new_name = hostname.name
988     self.ip = new_ip = hostname.ip
989     old_name = self.sstore.GetClusterName()
990     old_ip = self.sstore.GetMasterIP()
991     if new_name == old_name and new_ip == old_ip:
992       raise errors.OpPrereqError("Neither the name nor the IP address of the"
993                                  " cluster has changed")
994     if new_ip != old_ip:
995       result = utils.RunCmd(["fping", "-q", new_ip])
996       if not result.failed:
997         raise errors.OpPrereqError("The given cluster IP address (%s) is"
998                                    " reachable on the network. Aborting." %
999                                    new_ip)
1000
1001     self.op.name = new_name
1002
1003   def Exec(self, feedback_fn):
1004     """Rename the cluster.
1005
1006     """
1007     clustername = self.op.name
1008     ip = self.ip
1009     ss = self.sstore
1010
1011     # shutdown the master IP
1012     master = ss.GetMasterNode()
1013     if not rpc.call_node_stop_master(master):
1014       raise errors.OpExecError("Could not disable the master role")
1015
1016     try:
1017       # modify the sstore
1018       ss.SetKey(ss.SS_MASTER_IP, ip)
1019       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1020
1021       # Distribute updated ss config to all nodes
1022       myself = self.cfg.GetNodeInfo(master)
1023       dist_nodes = self.cfg.GetNodeList()
1024       if myself.name in dist_nodes:
1025         dist_nodes.remove(myself.name)
1026
1027       logger.Debug("Copying updated ssconf data to all nodes")
1028       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1029         fname = ss.KeyToFilename(keyname)
1030         result = rpc.call_upload_file(dist_nodes, fname)
1031         for to_node in dist_nodes:
1032           if not result[to_node]:
1033             logger.Error("copy of file %s to node %s failed" %
1034                          (fname, to_node))
1035     finally:
1036       if not rpc.call_node_start_master(master):
1037         logger.Error("Could not re-enable the master role on the master,"
1038                      " please restart manually.")
1039
1040
1041 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1042   """Sleep and poll for an instance's disk to sync.
1043
1044   """
1045   if not instance.disks:
1046     return True
1047
1048   if not oneshot:
1049     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1050
1051   node = instance.primary_node
1052
1053   for dev in instance.disks:
1054     cfgw.SetDiskID(dev, node)
1055
1056   retries = 0
1057   while True:
1058     max_time = 0
1059     done = True
1060     cumul_degraded = False
1061     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1062     if not rstats:
1063       proc.LogWarning("Can't get any data from node %s" % node)
1064       retries += 1
1065       if retries >= 10:
1066         raise errors.RemoteError("Can't contact node %s for mirror data,"
1067                                  " aborting." % node)
1068       time.sleep(6)
1069       continue
1070     retries = 0
1071     for i in range(len(rstats)):
1072       mstat = rstats[i]
1073       if mstat is None:
1074         proc.LogWarning("Can't compute data for node %s/%s" %
1075                         (node, instance.disks[i].iv_name))
1076         continue
1077       # we ignore the ldisk parameter
1078       perc_done, est_time, is_degraded, _ = mstat
1079       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1080       if perc_done is not None:
1081         done = False
1082         if est_time is not None:
1083           rem_time = "%d estimated seconds remaining" % est_time
1084           max_time = est_time
1085         else:
1086           rem_time = "no time estimate"
1087         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1088                      (instance.disks[i].iv_name, perc_done, rem_time))
1089     if done or oneshot:
1090       break
1091
1092     if unlock:
1093       utils.Unlock('cmd')
1094     try:
1095       time.sleep(min(60, max_time))
1096     finally:
1097       if unlock:
1098         utils.Lock('cmd')
1099
1100   if done:
1101     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1102   return not cumul_degraded
1103
1104
1105 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1106   """Check that mirrors are not degraded.
1107
1108   The ldisk parameter, if True, will change the test from the
1109   is_degraded attribute (which represents overall non-ok status for
1110   the device(s)) to the ldisk (representing the local storage status).
1111
1112   """
1113   cfgw.SetDiskID(dev, node)
1114   if ldisk:
1115     idx = 6
1116   else:
1117     idx = 5
1118
1119   result = True
1120   if on_primary or dev.AssembleOnSecondary():
1121     rstats = rpc.call_blockdev_find(node, dev)
1122     if not rstats:
1123       logger.ToStderr("Can't get any data from node %s" % node)
1124       result = False
1125     else:
1126       result = result and (not rstats[idx])
1127   if dev.children:
1128     for child in dev.children:
1129       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1130
1131   return result
1132
1133
1134 class LUDiagnoseOS(NoHooksLU):
1135   """Logical unit for OS diagnose/query.
1136
1137   """
1138   _OP_REQP = []
1139
1140   def CheckPrereq(self):
1141     """Check prerequisites.
1142
1143     This always succeeds, since this is a pure query LU.
1144
1145     """
1146     return
1147
1148   def Exec(self, feedback_fn):
1149     """Compute the list of OSes.
1150
1151     """
1152     node_list = self.cfg.GetNodeList()
1153     node_data = rpc.call_os_diagnose(node_list)
1154     if node_data == False:
1155       raise errors.OpExecError("Can't gather the list of OSes")
1156     return node_data
1157
1158
1159 class LURemoveNode(LogicalUnit):
1160   """Logical unit for removing a node.
1161
1162   """
1163   HPATH = "node-remove"
1164   HTYPE = constants.HTYPE_NODE
1165   _OP_REQP = ["node_name"]
1166
1167   def BuildHooksEnv(self):
1168     """Build hooks env.
1169
1170     This doesn't run on the target node in the pre phase as a failed
1171     node would not allows itself to run.
1172
1173     """
1174     env = {
1175       "OP_TARGET": self.op.node_name,
1176       "NODE_NAME": self.op.node_name,
1177       }
1178     all_nodes = self.cfg.GetNodeList()
1179     all_nodes.remove(self.op.node_name)
1180     return env, all_nodes, all_nodes
1181
1182   def CheckPrereq(self):
1183     """Check prerequisites.
1184
1185     This checks:
1186      - the node exists in the configuration
1187      - it does not have primary or secondary instances
1188      - it's not the master
1189
1190     Any errors are signalled by raising errors.OpPrereqError.
1191
1192     """
1193     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1194     if node is None:
1195       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1196
1197     instance_list = self.cfg.GetInstanceList()
1198
1199     masternode = self.sstore.GetMasterNode()
1200     if node.name == masternode:
1201       raise errors.OpPrereqError("Node is the master node,"
1202                                  " you need to failover first.")
1203
1204     for instance_name in instance_list:
1205       instance = self.cfg.GetInstanceInfo(instance_name)
1206       if node.name == instance.primary_node:
1207         raise errors.OpPrereqError("Instance %s still running on the node,"
1208                                    " please remove first." % instance_name)
1209       if node.name in instance.secondary_nodes:
1210         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1211                                    " please remove first." % instance_name)
1212     self.op.node_name = node.name
1213     self.node = node
1214
1215   def Exec(self, feedback_fn):
1216     """Removes the node from the cluster.
1217
1218     """
1219     node = self.node
1220     logger.Info("stopping the node daemon and removing configs from node %s" %
1221                 node.name)
1222
1223     rpc.call_node_leave_cluster(node.name)
1224
1225     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1226
1227     logger.Info("Removing node %s from config" % node.name)
1228
1229     self.cfg.RemoveNode(node.name)
1230
1231     _RemoveHostFromEtcHosts(node.name)
1232
1233
1234 class LUQueryNodes(NoHooksLU):
1235   """Logical unit for querying nodes.
1236
1237   """
1238   _OP_REQP = ["output_fields", "names"]
1239
1240   def CheckPrereq(self):
1241     """Check prerequisites.
1242
1243     This checks that the fields required are valid output fields.
1244
1245     """
1246     self.dynamic_fields = frozenset(["dtotal", "dfree",
1247                                      "mtotal", "mnode", "mfree",
1248                                      "bootid"])
1249
1250     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1251                                "pinst_list", "sinst_list",
1252                                "pip", "sip"],
1253                        dynamic=self.dynamic_fields,
1254                        selected=self.op.output_fields)
1255
1256     self.wanted = _GetWantedNodes(self, self.op.names)
1257
1258   def Exec(self, feedback_fn):
1259     """Computes the list of nodes and their attributes.
1260
1261     """
1262     nodenames = self.wanted
1263     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1264
1265     # begin data gathering
1266
1267     if self.dynamic_fields.intersection(self.op.output_fields):
1268       live_data = {}
1269       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1270       for name in nodenames:
1271         nodeinfo = node_data.get(name, None)
1272         if nodeinfo:
1273           live_data[name] = {
1274             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1275             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1276             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1277             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1278             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1279             "bootid": nodeinfo['bootid'],
1280             }
1281         else:
1282           live_data[name] = {}
1283     else:
1284       live_data = dict.fromkeys(nodenames, {})
1285
1286     node_to_primary = dict([(name, set()) for name in nodenames])
1287     node_to_secondary = dict([(name, set()) for name in nodenames])
1288
1289     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1290                              "sinst_cnt", "sinst_list"))
1291     if inst_fields & frozenset(self.op.output_fields):
1292       instancelist = self.cfg.GetInstanceList()
1293
1294       for instance_name in instancelist:
1295         inst = self.cfg.GetInstanceInfo(instance_name)
1296         if inst.primary_node in node_to_primary:
1297           node_to_primary[inst.primary_node].add(inst.name)
1298         for secnode in inst.secondary_nodes:
1299           if secnode in node_to_secondary:
1300             node_to_secondary[secnode].add(inst.name)
1301
1302     # end data gathering
1303
1304     output = []
1305     for node in nodelist:
1306       node_output = []
1307       for field in self.op.output_fields:
1308         if field == "name":
1309           val = node.name
1310         elif field == "pinst_list":
1311           val = list(node_to_primary[node.name])
1312         elif field == "sinst_list":
1313           val = list(node_to_secondary[node.name])
1314         elif field == "pinst_cnt":
1315           val = len(node_to_primary[node.name])
1316         elif field == "sinst_cnt":
1317           val = len(node_to_secondary[node.name])
1318         elif field == "pip":
1319           val = node.primary_ip
1320         elif field == "sip":
1321           val = node.secondary_ip
1322         elif field in self.dynamic_fields:
1323           val = live_data[node.name].get(field, None)
1324         else:
1325           raise errors.ParameterError(field)
1326         node_output.append(val)
1327       output.append(node_output)
1328
1329     return output
1330
1331
1332 class LUQueryNodeVolumes(NoHooksLU):
1333   """Logical unit for getting volumes on node(s).
1334
1335   """
1336   _OP_REQP = ["nodes", "output_fields"]
1337
1338   def CheckPrereq(self):
1339     """Check prerequisites.
1340
1341     This checks that the fields required are valid output fields.
1342
1343     """
1344     self.nodes = _GetWantedNodes(self, self.op.nodes)
1345
1346     _CheckOutputFields(static=["node"],
1347                        dynamic=["phys", "vg", "name", "size", "instance"],
1348                        selected=self.op.output_fields)
1349
1350
1351   def Exec(self, feedback_fn):
1352     """Computes the list of nodes and their attributes.
1353
1354     """
1355     nodenames = self.nodes
1356     volumes = rpc.call_node_volumes(nodenames)
1357
1358     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1359              in self.cfg.GetInstanceList()]
1360
1361     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1362
1363     output = []
1364     for node in nodenames:
1365       if node not in volumes or not volumes[node]:
1366         continue
1367
1368       node_vols = volumes[node][:]
1369       node_vols.sort(key=lambda vol: vol['dev'])
1370
1371       for vol in node_vols:
1372         node_output = []
1373         for field in self.op.output_fields:
1374           if field == "node":
1375             val = node
1376           elif field == "phys":
1377             val = vol['dev']
1378           elif field == "vg":
1379             val = vol['vg']
1380           elif field == "name":
1381             val = vol['name']
1382           elif field == "size":
1383             val = int(float(vol['size']))
1384           elif field == "instance":
1385             for inst in ilist:
1386               if node not in lv_by_node[inst]:
1387                 continue
1388               if vol['name'] in lv_by_node[inst][node]:
1389                 val = inst.name
1390                 break
1391             else:
1392               val = '-'
1393           else:
1394             raise errors.ParameterError(field)
1395           node_output.append(str(val))
1396
1397         output.append(node_output)
1398
1399     return output
1400
1401
1402 class LUAddNode(LogicalUnit):
1403   """Logical unit for adding node to the cluster.
1404
1405   """
1406   HPATH = "node-add"
1407   HTYPE = constants.HTYPE_NODE
1408   _OP_REQP = ["node_name"]
1409
1410   def BuildHooksEnv(self):
1411     """Build hooks env.
1412
1413     This will run on all nodes before, and on all nodes + the new node after.
1414
1415     """
1416     env = {
1417       "OP_TARGET": self.op.node_name,
1418       "NODE_NAME": self.op.node_name,
1419       "NODE_PIP": self.op.primary_ip,
1420       "NODE_SIP": self.op.secondary_ip,
1421       }
1422     nodes_0 = self.cfg.GetNodeList()
1423     nodes_1 = nodes_0 + [self.op.node_name, ]
1424     return env, nodes_0, nodes_1
1425
1426   def CheckPrereq(self):
1427     """Check prerequisites.
1428
1429     This checks:
1430      - the new node is not already in the config
1431      - it is resolvable
1432      - its parameters (single/dual homed) matches the cluster
1433
1434     Any errors are signalled by raising errors.OpPrereqError.
1435
1436     """
1437     node_name = self.op.node_name
1438     cfg = self.cfg
1439
1440     dns_data = utils.HostInfo(node_name)
1441
1442     node = dns_data.name
1443     primary_ip = self.op.primary_ip = dns_data.ip
1444     secondary_ip = getattr(self.op, "secondary_ip", None)
1445     if secondary_ip is None:
1446       secondary_ip = primary_ip
1447     if not utils.IsValidIP(secondary_ip):
1448       raise errors.OpPrereqError("Invalid secondary IP given")
1449     self.op.secondary_ip = secondary_ip
1450     node_list = cfg.GetNodeList()
1451     if node in node_list:
1452       raise errors.OpPrereqError("Node %s is already in the configuration"
1453                                  % node)
1454
1455     for existing_node_name in node_list:
1456       existing_node = cfg.GetNodeInfo(existing_node_name)
1457       if (existing_node.primary_ip == primary_ip or
1458           existing_node.secondary_ip == primary_ip or
1459           existing_node.primary_ip == secondary_ip or
1460           existing_node.secondary_ip == secondary_ip):
1461         raise errors.OpPrereqError("New node ip address(es) conflict with"
1462                                    " existing node %s" % existing_node.name)
1463
1464     # check that the type of the node (single versus dual homed) is the
1465     # same as for the master
1466     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1467     master_singlehomed = myself.secondary_ip == myself.primary_ip
1468     newbie_singlehomed = secondary_ip == primary_ip
1469     if master_singlehomed != newbie_singlehomed:
1470       if master_singlehomed:
1471         raise errors.OpPrereqError("The master has no private ip but the"
1472                                    " new node has one")
1473       else:
1474         raise errors.OpPrereqError("The master has a private ip but the"
1475                                    " new node doesn't have one")
1476
1477     # checks reachablity
1478     if not utils.TcpPing(utils.HostInfo().name,
1479                          primary_ip,
1480                          constants.DEFAULT_NODED_PORT):
1481       raise errors.OpPrereqError("Node not reachable by ping")
1482
1483     if not newbie_singlehomed:
1484       # check reachability from my secondary ip to newbie's secondary ip
1485       if not utils.TcpPing(myself.secondary_ip,
1486                            secondary_ip,
1487                            constants.DEFAULT_NODED_PORT):
1488         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1489                                    " based ping to noded port")
1490
1491     self.new_node = objects.Node(name=node,
1492                                  primary_ip=primary_ip,
1493                                  secondary_ip=secondary_ip)
1494
1495     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1496       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1497         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1498                                    constants.VNC_PASSWORD_FILE)
1499
1500   def Exec(self, feedback_fn):
1501     """Adds the new node to the cluster.
1502
1503     """
1504     new_node = self.new_node
1505     node = new_node.name
1506
1507     # set up inter-node password and certificate and restarts the node daemon
1508     gntpass = self.sstore.GetNodeDaemonPassword()
1509     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1510       raise errors.OpExecError("ganeti password corruption detected")
1511     f = open(constants.SSL_CERT_FILE)
1512     try:
1513       gntpem = f.read(8192)
1514     finally:
1515       f.close()
1516     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1517     # so we use this to detect an invalid certificate; as long as the
1518     # cert doesn't contain this, the here-document will be correctly
1519     # parsed by the shell sequence below
1520     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1521       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1522     if not gntpem.endswith("\n"):
1523       raise errors.OpExecError("PEM must end with newline")
1524     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1525
1526     # and then connect with ssh to set password and start ganeti-noded
1527     # note that all the below variables are sanitized at this point,
1528     # either by being constants or by the checks above
1529     ss = self.sstore
1530     mycommand = ("umask 077 && "
1531                  "echo '%s' > '%s' && "
1532                  "cat > '%s' << '!EOF.' && \n"
1533                  "%s!EOF.\n%s restart" %
1534                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1535                   constants.SSL_CERT_FILE, gntpem,
1536                   constants.NODE_INITD_SCRIPT))
1537
1538     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1539     if result.failed:
1540       raise errors.OpExecError("Remote command on node %s, error: %s,"
1541                                " output: %s" %
1542                                (node, result.fail_reason, result.output))
1543
1544     # check connectivity
1545     time.sleep(4)
1546
1547     result = rpc.call_version([node])[node]
1548     if result:
1549       if constants.PROTOCOL_VERSION == result:
1550         logger.Info("communication to node %s fine, sw version %s match" %
1551                     (node, result))
1552       else:
1553         raise errors.OpExecError("Version mismatch master version %s,"
1554                                  " node version %s" %
1555                                  (constants.PROTOCOL_VERSION, result))
1556     else:
1557       raise errors.OpExecError("Cannot get version from the new node")
1558
1559     # setup ssh on node
1560     logger.Info("copy ssh key to node %s" % node)
1561     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1562     keyarray = []
1563     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1564                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1565                 priv_key, pub_key]
1566
1567     for i in keyfiles:
1568       f = open(i, 'r')
1569       try:
1570         keyarray.append(f.read())
1571       finally:
1572         f.close()
1573
1574     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1575                                keyarray[3], keyarray[4], keyarray[5])
1576
1577     if not result:
1578       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1579
1580     # Add node to our /etc/hosts, and add key to known_hosts
1581     _AddHostToEtcHosts(new_node.name)
1582
1583     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1584                       self.cfg.GetHostKey())
1585
1586     if new_node.secondary_ip != new_node.primary_ip:
1587       if not rpc.call_node_tcp_ping(new_node.name,
1588                                     constants.LOCALHOST_IP_ADDRESS,
1589                                     new_node.secondary_ip,
1590                                     constants.DEFAULT_NODED_PORT,
1591                                     10, False):
1592         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1593                                  " you gave (%s). Please fix and re-run this"
1594                                  " command." % new_node.secondary_ip)
1595
1596     success, msg = ssh.VerifyNodeHostname(node)
1597     if not success:
1598       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1599                                " than the one the resolver gives: %s."
1600                                " Please fix and re-run this command." %
1601                                (node, msg))
1602
1603     # Distribute updated /etc/hosts and known_hosts to all nodes,
1604     # including the node just added
1605     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1606     dist_nodes = self.cfg.GetNodeList() + [node]
1607     if myself.name in dist_nodes:
1608       dist_nodes.remove(myself.name)
1609
1610     logger.Debug("Copying hosts and known_hosts to all nodes")
1611     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1612       result = rpc.call_upload_file(dist_nodes, fname)
1613       for to_node in dist_nodes:
1614         if not result[to_node]:
1615           logger.Error("copy of file %s to node %s failed" %
1616                        (fname, to_node))
1617
1618     to_copy = ss.GetFileList()
1619     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1620       to_copy.append(constants.VNC_PASSWORD_FILE)
1621     for fname in to_copy:
1622       if not ssh.CopyFileToNode(node, fname):
1623         logger.Error("could not copy file %s to node %s" % (fname, node))
1624
1625     logger.Info("adding node %s to cluster.conf" % node)
1626     self.cfg.AddNode(new_node)
1627
1628
1629 class LUMasterFailover(LogicalUnit):
1630   """Failover the master node to the current node.
1631
1632   This is a special LU in that it must run on a non-master node.
1633
1634   """
1635   HPATH = "master-failover"
1636   HTYPE = constants.HTYPE_CLUSTER
1637   REQ_MASTER = False
1638   _OP_REQP = []
1639
1640   def BuildHooksEnv(self):
1641     """Build hooks env.
1642
1643     This will run on the new master only in the pre phase, and on all
1644     the nodes in the post phase.
1645
1646     """
1647     env = {
1648       "OP_TARGET": self.new_master,
1649       "NEW_MASTER": self.new_master,
1650       "OLD_MASTER": self.old_master,
1651       }
1652     return env, [self.new_master], self.cfg.GetNodeList()
1653
1654   def CheckPrereq(self):
1655     """Check prerequisites.
1656
1657     This checks that we are not already the master.
1658
1659     """
1660     self.new_master = utils.HostInfo().name
1661     self.old_master = self.sstore.GetMasterNode()
1662
1663     if self.old_master == self.new_master:
1664       raise errors.OpPrereqError("This commands must be run on the node"
1665                                  " where you want the new master to be."
1666                                  " %s is already the master" %
1667                                  self.old_master)
1668
1669   def Exec(self, feedback_fn):
1670     """Failover the master node.
1671
1672     This command, when run on a non-master node, will cause the current
1673     master to cease being master, and the non-master to become new
1674     master.
1675
1676     """
1677     #TODO: do not rely on gethostname returning the FQDN
1678     logger.Info("setting master to %s, old master: %s" %
1679                 (self.new_master, self.old_master))
1680
1681     if not rpc.call_node_stop_master(self.old_master):
1682       logger.Error("could disable the master role on the old master"
1683                    " %s, please disable manually" % self.old_master)
1684
1685     ss = self.sstore
1686     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689       logger.Error("could not distribute the new simple store master file"
1690                    " to the other nodes, please check.")
1691
1692     if not rpc.call_node_start_master(self.new_master):
1693       logger.Error("could not start the master role on the new master"
1694                    " %s, please check" % self.new_master)
1695       feedback_fn("Error in activating the master IP on the new master,"
1696                   " please fix manually.")
1697
1698
1699
1700 class LUQueryClusterInfo(NoHooksLU):
1701   """Query cluster configuration.
1702
1703   """
1704   _OP_REQP = []
1705   REQ_MASTER = False
1706
1707   def CheckPrereq(self):
1708     """No prerequsites needed for this LU.
1709
1710     """
1711     pass
1712
1713   def Exec(self, feedback_fn):
1714     """Return cluster config.
1715
1716     """
1717     result = {
1718       "name": self.sstore.GetClusterName(),
1719       "software_version": constants.RELEASE_VERSION,
1720       "protocol_version": constants.PROTOCOL_VERSION,
1721       "config_version": constants.CONFIG_VERSION,
1722       "os_api_version": constants.OS_API_VERSION,
1723       "export_version": constants.EXPORT_VERSION,
1724       "master": self.sstore.GetMasterNode(),
1725       "architecture": (platform.architecture()[0], platform.machine()),
1726       }
1727
1728     return result
1729
1730
1731 class LUClusterCopyFile(NoHooksLU):
1732   """Copy file to cluster.
1733
1734   """
1735   _OP_REQP = ["nodes", "filename"]
1736
1737   def CheckPrereq(self):
1738     """Check prerequisites.
1739
1740     It should check that the named file exists and that the given list
1741     of nodes is valid.
1742
1743     """
1744     if not os.path.exists(self.op.filename):
1745       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1746
1747     self.nodes = _GetWantedNodes(self, self.op.nodes)
1748
1749   def Exec(self, feedback_fn):
1750     """Copy a file from master to some nodes.
1751
1752     Args:
1753       opts - class with options as members
1754       args - list containing a single element, the file name
1755     Opts used:
1756       nodes - list containing the name of target nodes; if empty, all nodes
1757
1758     """
1759     filename = self.op.filename
1760
1761     myname = utils.HostInfo().name
1762
1763     for node in self.nodes:
1764       if node == myname:
1765         continue
1766       if not ssh.CopyFileToNode(node, filename):
1767         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1768
1769
1770 class LUDumpClusterConfig(NoHooksLU):
1771   """Return a text-representation of the cluster-config.
1772
1773   """
1774   _OP_REQP = []
1775
1776   def CheckPrereq(self):
1777     """No prerequisites.
1778
1779     """
1780     pass
1781
1782   def Exec(self, feedback_fn):
1783     """Dump a representation of the cluster config to the standard output.
1784
1785     """
1786     return self.cfg.DumpConfig()
1787
1788
1789 class LURunClusterCommand(NoHooksLU):
1790   """Run a command on some nodes.
1791
1792   """
1793   _OP_REQP = ["command", "nodes"]
1794
1795   def CheckPrereq(self):
1796     """Check prerequisites.
1797
1798     It checks that the given list of nodes is valid.
1799
1800     """
1801     self.nodes = _GetWantedNodes(self, self.op.nodes)
1802
1803   def Exec(self, feedback_fn):
1804     """Run a command on some nodes.
1805
1806     """
1807     data = []
1808     for node in self.nodes:
1809       result = ssh.SSHCall(node, "root", self.op.command)
1810       data.append((node, result.output, result.exit_code))
1811
1812     return data
1813
1814
1815 class LUActivateInstanceDisks(NoHooksLU):
1816   """Bring up an instance's disks.
1817
1818   """
1819   _OP_REQP = ["instance_name"]
1820
1821   def CheckPrereq(self):
1822     """Check prerequisites.
1823
1824     This checks that the instance is in the cluster.
1825
1826     """
1827     instance = self.cfg.GetInstanceInfo(
1828       self.cfg.ExpandInstanceName(self.op.instance_name))
1829     if instance is None:
1830       raise errors.OpPrereqError("Instance '%s' not known" %
1831                                  self.op.instance_name)
1832     self.instance = instance
1833
1834
1835   def Exec(self, feedback_fn):
1836     """Activate the disks.
1837
1838     """
1839     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1840     if not disks_ok:
1841       raise errors.OpExecError("Cannot activate block devices")
1842
1843     return disks_info
1844
1845
1846 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1847   """Prepare the block devices for an instance.
1848
1849   This sets up the block devices on all nodes.
1850
1851   Args:
1852     instance: a ganeti.objects.Instance object
1853     ignore_secondaries: if true, errors on secondary nodes won't result
1854                         in an error return from the function
1855
1856   Returns:
1857     false if the operation failed
1858     list of (host, instance_visible_name, node_visible_name) if the operation
1859          suceeded with the mapping from node devices to instance devices
1860   """
1861   device_info = []
1862   disks_ok = True
1863   iname = instance.name
1864   # With the two passes mechanism we try to reduce the window of
1865   # opportunity for the race condition of switching DRBD to primary
1866   # before handshaking occured, but we do not eliminate it
1867
1868   # The proper fix would be to wait (with some limits) until the
1869   # connection has been made and drbd transitions from WFConnection
1870   # into any other network-connected state (Connected, SyncTarget,
1871   # SyncSource, etc.)
1872
1873   # 1st pass, assemble on all nodes in secondary mode
1874   for inst_disk in instance.disks:
1875     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1876       cfg.SetDiskID(node_disk, node)
1877       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1878       if not result:
1879         logger.Error("could not prepare block device %s on node %s"
1880                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1881         if not ignore_secondaries:
1882           disks_ok = False
1883
1884   # FIXME: race condition on drbd migration to primary
1885
1886   # 2nd pass, do only the primary node
1887   for inst_disk in instance.disks:
1888     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1889       if node != instance.primary_node:
1890         continue
1891       cfg.SetDiskID(node_disk, node)
1892       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1893       if not result:
1894         logger.Error("could not prepare block device %s on node %s"
1895                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1896         disks_ok = False
1897     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1898
1899   # leave the disks configured for the primary node
1900   # this is a workaround that would be fixed better by
1901   # improving the logical/physical id handling
1902   for disk in instance.disks:
1903     cfg.SetDiskID(disk, instance.primary_node)
1904
1905   return disks_ok, device_info
1906
1907
1908 def _StartInstanceDisks(cfg, instance, force):
1909   """Start the disks of an instance.
1910
1911   """
1912   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1913                                            ignore_secondaries=force)
1914   if not disks_ok:
1915     _ShutdownInstanceDisks(instance, cfg)
1916     if force is not None and not force:
1917       logger.Error("If the message above refers to a secondary node,"
1918                    " you can retry the operation using '--force'.")
1919     raise errors.OpExecError("Disk consistency error")
1920
1921
1922 class LUDeactivateInstanceDisks(NoHooksLU):
1923   """Shutdown an instance's disks.
1924
1925   """
1926   _OP_REQP = ["instance_name"]
1927
1928   def CheckPrereq(self):
1929     """Check prerequisites.
1930
1931     This checks that the instance is in the cluster.
1932
1933     """
1934     instance = self.cfg.GetInstanceInfo(
1935       self.cfg.ExpandInstanceName(self.op.instance_name))
1936     if instance is None:
1937       raise errors.OpPrereqError("Instance '%s' not known" %
1938                                  self.op.instance_name)
1939     self.instance = instance
1940
1941   def Exec(self, feedback_fn):
1942     """Deactivate the disks
1943
1944     """
1945     instance = self.instance
1946     ins_l = rpc.call_instance_list([instance.primary_node])
1947     ins_l = ins_l[instance.primary_node]
1948     if not type(ins_l) is list:
1949       raise errors.OpExecError("Can't contact node '%s'" %
1950                                instance.primary_node)
1951
1952     if self.instance.name in ins_l:
1953       raise errors.OpExecError("Instance is running, can't shutdown"
1954                                " block devices.")
1955
1956     _ShutdownInstanceDisks(instance, self.cfg)
1957
1958
1959 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1960   """Shutdown block devices of an instance.
1961
1962   This does the shutdown on all nodes of the instance.
1963
1964   If the ignore_primary is false, errors on the primary node are
1965   ignored.
1966
1967   """
1968   result = True
1969   for disk in instance.disks:
1970     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1971       cfg.SetDiskID(top_disk, node)
1972       if not rpc.call_blockdev_shutdown(node, top_disk):
1973         logger.Error("could not shutdown block device %s on node %s" %
1974                      (disk.iv_name, node))
1975         if not ignore_primary or node != instance.primary_node:
1976           result = False
1977   return result
1978
1979
1980 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1981   """Checks if a node has enough free memory.
1982
1983   This function check if a given node has the needed amount of free
1984   memory. In case the node has less memory or we cannot get the
1985   information from the node, this function raise an OpPrereqError
1986   exception.
1987
1988   Args:
1989     - cfg: a ConfigWriter instance
1990     - node: the node name
1991     - reason: string to use in the error message
1992     - requested: the amount of memory in MiB
1993
1994   """
1995   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1996   if not nodeinfo or not isinstance(nodeinfo, dict):
1997     raise errors.OpPrereqError("Could not contact node %s for resource"
1998                              " information" % (node,))
1999
2000   free_mem = nodeinfo[node].get('memory_free')
2001   if not isinstance(free_mem, int):
2002     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2003                              " was '%s'" % (node, free_mem))
2004   if requested > free_mem:
2005     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2006                              " needed %s MiB, available %s MiB" %
2007                              (node, reason, requested, free_mem))
2008
2009
2010 class LUStartupInstance(LogicalUnit):
2011   """Starts an instance.
2012
2013   """
2014   HPATH = "instance-start"
2015   HTYPE = constants.HTYPE_INSTANCE
2016   _OP_REQP = ["instance_name", "force"]
2017
2018   def BuildHooksEnv(self):
2019     """Build hooks env.
2020
2021     This runs on master, primary and secondary nodes of the instance.
2022
2023     """
2024     env = {
2025       "FORCE": self.op.force,
2026       }
2027     env.update(_BuildInstanceHookEnvByObject(self.instance))
2028     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2029           list(self.instance.secondary_nodes))
2030     return env, nl, nl
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     instance = self.cfg.GetInstanceInfo(
2039       self.cfg.ExpandInstanceName(self.op.instance_name))
2040     if instance is None:
2041       raise errors.OpPrereqError("Instance '%s' not known" %
2042                                  self.op.instance_name)
2043
2044     # check bridges existance
2045     _CheckInstanceBridgesExist(instance)
2046
2047     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2048                          "starting instance %s" % instance.name,
2049                          instance.memory)
2050
2051     self.instance = instance
2052     self.op.instance_name = instance.name
2053
2054   def Exec(self, feedback_fn):
2055     """Start the instance.
2056
2057     """
2058     instance = self.instance
2059     force = self.op.force
2060     extra_args = getattr(self.op, "extra_args", "")
2061
2062     node_current = instance.primary_node
2063
2064     _StartInstanceDisks(self.cfg, instance, force)
2065
2066     if not rpc.call_instance_start(node_current, instance, extra_args):
2067       _ShutdownInstanceDisks(instance, self.cfg)
2068       raise errors.OpExecError("Could not start instance")
2069
2070     self.cfg.MarkInstanceUp(instance.name)
2071
2072
2073 class LURebootInstance(LogicalUnit):
2074   """Reboot an instance.
2075
2076   """
2077   HPATH = "instance-reboot"
2078   HTYPE = constants.HTYPE_INSTANCE
2079   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080
2081   def BuildHooksEnv(self):
2082     """Build hooks env.
2083
2084     This runs on master, primary and secondary nodes of the instance.
2085
2086     """
2087     env = {
2088       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2089       }
2090     env.update(_BuildInstanceHookEnvByObject(self.instance))
2091     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2092           list(self.instance.secondary_nodes))
2093     return env, nl, nl
2094
2095   def CheckPrereq(self):
2096     """Check prerequisites.
2097
2098     This checks that the instance is in the cluster.
2099
2100     """
2101     instance = self.cfg.GetInstanceInfo(
2102       self.cfg.ExpandInstanceName(self.op.instance_name))
2103     if instance is None:
2104       raise errors.OpPrereqError("Instance '%s' not known" %
2105                                  self.op.instance_name)
2106
2107     # check bridges existance
2108     _CheckInstanceBridgesExist(instance)
2109
2110     self.instance = instance
2111     self.op.instance_name = instance.name
2112
2113   def Exec(self, feedback_fn):
2114     """Reboot the instance.
2115
2116     """
2117     instance = self.instance
2118     ignore_secondaries = self.op.ignore_secondaries
2119     reboot_type = self.op.reboot_type
2120     extra_args = getattr(self.op, "extra_args", "")
2121
2122     node_current = instance.primary_node
2123
2124     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2125                            constants.INSTANCE_REBOOT_HARD,
2126                            constants.INSTANCE_REBOOT_FULL]:
2127       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2128                                   (constants.INSTANCE_REBOOT_SOFT,
2129                                    constants.INSTANCE_REBOOT_HARD,
2130                                    constants.INSTANCE_REBOOT_FULL))
2131
2132     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2133                        constants.INSTANCE_REBOOT_HARD]:
2134       if not rpc.call_instance_reboot(node_current, instance,
2135                                       reboot_type, extra_args):
2136         raise errors.OpExecError("Could not reboot instance")
2137     else:
2138       if not rpc.call_instance_shutdown(node_current, instance):
2139         raise errors.OpExecError("could not shutdown instance for full reboot")
2140       _ShutdownInstanceDisks(instance, self.cfg)
2141       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2142       if not rpc.call_instance_start(node_current, instance, extra_args):
2143         _ShutdownInstanceDisks(instance, self.cfg)
2144         raise errors.OpExecError("Could not start instance for full reboot")
2145
2146     self.cfg.MarkInstanceUp(instance.name)
2147
2148
2149 class LUShutdownInstance(LogicalUnit):
2150   """Shutdown an instance.
2151
2152   """
2153   HPATH = "instance-stop"
2154   HTYPE = constants.HTYPE_INSTANCE
2155   _OP_REQP = ["instance_name"]
2156
2157   def BuildHooksEnv(self):
2158     """Build hooks env.
2159
2160     This runs on master, primary and secondary nodes of the instance.
2161
2162     """
2163     env = _BuildInstanceHookEnvByObject(self.instance)
2164     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2165           list(self.instance.secondary_nodes))
2166     return env, nl, nl
2167
2168   def CheckPrereq(self):
2169     """Check prerequisites.
2170
2171     This checks that the instance is in the cluster.
2172
2173     """
2174     instance = self.cfg.GetInstanceInfo(
2175       self.cfg.ExpandInstanceName(self.op.instance_name))
2176     if instance is None:
2177       raise errors.OpPrereqError("Instance '%s' not known" %
2178                                  self.op.instance_name)
2179     self.instance = instance
2180
2181   def Exec(self, feedback_fn):
2182     """Shutdown the instance.
2183
2184     """
2185     instance = self.instance
2186     node_current = instance.primary_node
2187     if not rpc.call_instance_shutdown(node_current, instance):
2188       logger.Error("could not shutdown instance")
2189
2190     self.cfg.MarkInstanceDown(instance.name)
2191     _ShutdownInstanceDisks(instance, self.cfg)
2192
2193
2194 class LUReinstallInstance(LogicalUnit):
2195   """Reinstall an instance.
2196
2197   """
2198   HPATH = "instance-reinstall"
2199   HTYPE = constants.HTYPE_INSTANCE
2200   _OP_REQP = ["instance_name"]
2201
2202   def BuildHooksEnv(self):
2203     """Build hooks env.
2204
2205     This runs on master, primary and secondary nodes of the instance.
2206
2207     """
2208     env = _BuildInstanceHookEnvByObject(self.instance)
2209     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210           list(self.instance.secondary_nodes))
2211     return env, nl, nl
2212
2213   def CheckPrereq(self):
2214     """Check prerequisites.
2215
2216     This checks that the instance is in the cluster and is not running.
2217
2218     """
2219     instance = self.cfg.GetInstanceInfo(
2220       self.cfg.ExpandInstanceName(self.op.instance_name))
2221     if instance is None:
2222       raise errors.OpPrereqError("Instance '%s' not known" %
2223                                  self.op.instance_name)
2224     if instance.disk_template == constants.DT_DISKLESS:
2225       raise errors.OpPrereqError("Instance '%s' has no disks" %
2226                                  self.op.instance_name)
2227     if instance.status != "down":
2228       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2229                                  self.op.instance_name)
2230     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2231     if remote_info:
2232       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2233                                  (self.op.instance_name,
2234                                   instance.primary_node))
2235
2236     self.op.os_type = getattr(self.op, "os_type", None)
2237     if self.op.os_type is not None:
2238       # OS verification
2239       pnode = self.cfg.GetNodeInfo(
2240         self.cfg.ExpandNodeName(instance.primary_node))
2241       if pnode is None:
2242         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2243                                    self.op.pnode)
2244       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2245       if not os_obj:
2246         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2247                                    " primary node"  % self.op.os_type)
2248
2249     self.instance = instance
2250
2251   def Exec(self, feedback_fn):
2252     """Reinstall the instance.
2253
2254     """
2255     inst = self.instance
2256
2257     if self.op.os_type is not None:
2258       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2259       inst.os = self.op.os_type
2260       self.cfg.AddInstance(inst)
2261
2262     _StartInstanceDisks(self.cfg, inst, None)
2263     try:
2264       feedback_fn("Running the instance OS create scripts...")
2265       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2266         raise errors.OpExecError("Could not install OS for instance %s"
2267                                  " on node %s" %
2268                                  (inst.name, inst.primary_node))
2269     finally:
2270       _ShutdownInstanceDisks(inst, self.cfg)
2271
2272
2273 class LURenameInstance(LogicalUnit):
2274   """Rename an instance.
2275
2276   """
2277   HPATH = "instance-rename"
2278   HTYPE = constants.HTYPE_INSTANCE
2279   _OP_REQP = ["instance_name", "new_name"]
2280
2281   def BuildHooksEnv(self):
2282     """Build hooks env.
2283
2284     This runs on master, primary and secondary nodes of the instance.
2285
2286     """
2287     env = _BuildInstanceHookEnvByObject(self.instance)
2288     env["INSTANCE_NEW_NAME"] = self.op.new_name
2289     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2290           list(self.instance.secondary_nodes))
2291     return env, nl, nl
2292
2293   def CheckPrereq(self):
2294     """Check prerequisites.
2295
2296     This checks that the instance is in the cluster and is not running.
2297
2298     """
2299     instance = self.cfg.GetInstanceInfo(
2300       self.cfg.ExpandInstanceName(self.op.instance_name))
2301     if instance is None:
2302       raise errors.OpPrereqError("Instance '%s' not known" %
2303                                  self.op.instance_name)
2304     if instance.status != "down":
2305       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2306                                  self.op.instance_name)
2307     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2308     if remote_info:
2309       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2310                                  (self.op.instance_name,
2311                                   instance.primary_node))
2312     self.instance = instance
2313
2314     # new name verification
2315     name_info = utils.HostInfo(self.op.new_name)
2316
2317     self.op.new_name = new_name = name_info.name
2318     instance_list = self.cfg.GetInstanceList()
2319     if new_name in instance_list:
2320       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2321                                  instance_name)
2322
2323     if not getattr(self.op, "ignore_ip", False):
2324       command = ["fping", "-q", name_info.ip]
2325       result = utils.RunCmd(command)
2326       if not result.failed:
2327         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2328                                    (name_info.ip, new_name))
2329
2330
2331   def Exec(self, feedback_fn):
2332     """Reinstall the instance.
2333
2334     """
2335     inst = self.instance
2336     old_name = inst.name
2337
2338     self.cfg.RenameInstance(inst.name, self.op.new_name)
2339
2340     # re-read the instance from the configuration after rename
2341     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2342
2343     _StartInstanceDisks(self.cfg, inst, None)
2344     try:
2345       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2346                                           "sda", "sdb"):
2347         msg = ("Could run OS rename script for instance %s on node %s (but the"
2348                " instance has been renamed in Ganeti)" %
2349                (inst.name, inst.primary_node))
2350         logger.Error(msg)
2351     finally:
2352       _ShutdownInstanceDisks(inst, self.cfg)
2353
2354
2355 class LURemoveInstance(LogicalUnit):
2356   """Remove an instance.
2357
2358   """
2359   HPATH = "instance-remove"
2360   HTYPE = constants.HTYPE_INSTANCE
2361   _OP_REQP = ["instance_name"]
2362
2363   def BuildHooksEnv(self):
2364     """Build hooks env.
2365
2366     This runs on master, primary and secondary nodes of the instance.
2367
2368     """
2369     env = _BuildInstanceHookEnvByObject(self.instance)
2370     nl = [self.sstore.GetMasterNode()]
2371     return env, nl, nl
2372
2373   def CheckPrereq(self):
2374     """Check prerequisites.
2375
2376     This checks that the instance is in the cluster.
2377
2378     """
2379     instance = self.cfg.GetInstanceInfo(
2380       self.cfg.ExpandInstanceName(self.op.instance_name))
2381     if instance is None:
2382       raise errors.OpPrereqError("Instance '%s' not known" %
2383                                  self.op.instance_name)
2384     self.instance = instance
2385
2386   def Exec(self, feedback_fn):
2387     """Remove the instance.
2388
2389     """
2390     instance = self.instance
2391     logger.Info("shutting down instance %s on node %s" %
2392                 (instance.name, instance.primary_node))
2393
2394     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2395       if self.op.ignore_failures:
2396         feedback_fn("Warning: can't shutdown instance")
2397       else:
2398         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2399                                  (instance.name, instance.primary_node))
2400
2401     logger.Info("removing block devices for instance %s" % instance.name)
2402
2403     if not _RemoveDisks(instance, self.cfg):
2404       if self.op.ignore_failures:
2405         feedback_fn("Warning: can't remove instance's disks")
2406       else:
2407         raise errors.OpExecError("Can't remove instance's disks")
2408
2409     logger.Info("removing instance %s out of cluster config" % instance.name)
2410
2411     self.cfg.RemoveInstance(instance.name)
2412
2413
2414 class LUQueryInstances(NoHooksLU):
2415   """Logical unit for querying instances.
2416
2417   """
2418   _OP_REQP = ["output_fields", "names"]
2419
2420   def CheckPrereq(self):
2421     """Check prerequisites.
2422
2423     This checks that the fields required are valid output fields.
2424
2425     """
2426     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2427     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2428                                "admin_state", "admin_ram",
2429                                "disk_template", "ip", "mac", "bridge",
2430                                "sda_size", "sdb_size", "vcpus"],
2431                        dynamic=self.dynamic_fields,
2432                        selected=self.op.output_fields)
2433
2434     self.wanted = _GetWantedInstances(self, self.op.names)
2435
2436   def Exec(self, feedback_fn):
2437     """Computes the list of nodes and their attributes.
2438
2439     """
2440     instance_names = self.wanted
2441     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2442                      in instance_names]
2443
2444     # begin data gathering
2445
2446     nodes = frozenset([inst.primary_node for inst in instance_list])
2447
2448     bad_nodes = []
2449     if self.dynamic_fields.intersection(self.op.output_fields):
2450       live_data = {}
2451       node_data = rpc.call_all_instances_info(nodes)
2452       for name in nodes:
2453         result = node_data[name]
2454         if result:
2455           live_data.update(result)
2456         elif result == False:
2457           bad_nodes.append(name)
2458         # else no instance is alive
2459     else:
2460       live_data = dict([(name, {}) for name in instance_names])
2461
2462     # end data gathering
2463
2464     output = []
2465     for instance in instance_list:
2466       iout = []
2467       for field in self.op.output_fields:
2468         if field == "name":
2469           val = instance.name
2470         elif field == "os":
2471           val = instance.os
2472         elif field == "pnode":
2473           val = instance.primary_node
2474         elif field == "snodes":
2475           val = list(instance.secondary_nodes)
2476         elif field == "admin_state":
2477           val = (instance.status != "down")
2478         elif field == "oper_state":
2479           if instance.primary_node in bad_nodes:
2480             val = None
2481           else:
2482             val = bool(live_data.get(instance.name))
2483         elif field == "status":
2484           if instance.primary_node in bad_nodes:
2485             val = "ERROR_nodedown"
2486           else:
2487             running = bool(live_data.get(instance.name))
2488             if running:
2489               if instance.status != "down":
2490                 val = "running"
2491               else:
2492                 val = "ERROR_up"
2493             else:
2494               if instance.status != "down":
2495                 val = "ERROR_down"
2496               else:
2497                 val = "ADMIN_down"
2498         elif field == "admin_ram":
2499           val = instance.memory
2500         elif field == "oper_ram":
2501           if instance.primary_node in bad_nodes:
2502             val = None
2503           elif instance.name in live_data:
2504             val = live_data[instance.name].get("memory", "?")
2505           else:
2506             val = "-"
2507         elif field == "disk_template":
2508           val = instance.disk_template
2509         elif field == "ip":
2510           val = instance.nics[0].ip
2511         elif field == "bridge":
2512           val = instance.nics[0].bridge
2513         elif field == "mac":
2514           val = instance.nics[0].mac
2515         elif field == "sda_size" or field == "sdb_size":
2516           disk = instance.FindDisk(field[:3])
2517           if disk is None:
2518             val = None
2519           else:
2520             val = disk.size
2521         elif field == "vcpus":
2522           val = instance.vcpus
2523         else:
2524           raise errors.ParameterError(field)
2525         iout.append(val)
2526       output.append(iout)
2527
2528     return output
2529
2530
2531 class LUFailoverInstance(LogicalUnit):
2532   """Failover an instance.
2533
2534   """
2535   HPATH = "instance-failover"
2536   HTYPE = constants.HTYPE_INSTANCE
2537   _OP_REQP = ["instance_name", "ignore_consistency"]
2538
2539   def BuildHooksEnv(self):
2540     """Build hooks env.
2541
2542     This runs on master, primary and secondary nodes of the instance.
2543
2544     """
2545     env = {
2546       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2547       }
2548     env.update(_BuildInstanceHookEnvByObject(self.instance))
2549     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2550     return env, nl, nl
2551
2552   def CheckPrereq(self):
2553     """Check prerequisites.
2554
2555     This checks that the instance is in the cluster.
2556
2557     """
2558     instance = self.cfg.GetInstanceInfo(
2559       self.cfg.ExpandInstanceName(self.op.instance_name))
2560     if instance is None:
2561       raise errors.OpPrereqError("Instance '%s' not known" %
2562                                  self.op.instance_name)
2563
2564     if instance.disk_template not in constants.DTS_NET_MIRROR:
2565       raise errors.OpPrereqError("Instance's disk layout is not"
2566                                  " network mirrored, cannot failover.")
2567
2568     secondary_nodes = instance.secondary_nodes
2569     if not secondary_nodes:
2570       raise errors.ProgrammerError("no secondary node but using "
2571                                    "DT_REMOTE_RAID1 template")
2572
2573     target_node = secondary_nodes[0]
2574     # check memory requirements on the secondary node
2575     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2576                          instance.name, instance.memory)
2577
2578     # check bridge existance
2579     brlist = [nic.bridge for nic in instance.nics]
2580     if not rpc.call_bridges_exist(target_node, brlist):
2581       raise errors.OpPrereqError("One or more target bridges %s does not"
2582                                  " exist on destination node '%s'" %
2583                                  (brlist, target_node))
2584
2585     self.instance = instance
2586
2587   def Exec(self, feedback_fn):
2588     """Failover an instance.
2589
2590     The failover is done by shutting it down on its present node and
2591     starting it on the secondary.
2592
2593     """
2594     instance = self.instance
2595
2596     source_node = instance.primary_node
2597     target_node = instance.secondary_nodes[0]
2598
2599     feedback_fn("* checking disk consistency between source and target")
2600     for dev in instance.disks:
2601       # for remote_raid1, these are md over drbd
2602       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2603         if not self.op.ignore_consistency:
2604           raise errors.OpExecError("Disk %s is degraded on target node,"
2605                                    " aborting failover." % dev.iv_name)
2606
2607     feedback_fn("* shutting down instance on source node")
2608     logger.Info("Shutting down instance %s on node %s" %
2609                 (instance.name, source_node))
2610
2611     if not rpc.call_instance_shutdown(source_node, instance):
2612       if self.op.ignore_consistency:
2613         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2614                      " anyway. Please make sure node %s is down"  %
2615                      (instance.name, source_node, source_node))
2616       else:
2617         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2618                                  (instance.name, source_node))
2619
2620     feedback_fn("* deactivating the instance's disks on source node")
2621     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2622       raise errors.OpExecError("Can't shut down the instance's disks.")
2623
2624     instance.primary_node = target_node
2625     # distribute new instance config to the other nodes
2626     self.cfg.AddInstance(instance)
2627
2628     feedback_fn("* activating the instance's disks on target node")
2629     logger.Info("Starting instance %s on node %s" %
2630                 (instance.name, target_node))
2631
2632     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2633                                              ignore_secondaries=True)
2634     if not disks_ok:
2635       _ShutdownInstanceDisks(instance, self.cfg)
2636       raise errors.OpExecError("Can't activate the instance's disks")
2637
2638     feedback_fn("* starting the instance on the target node")
2639     if not rpc.call_instance_start(target_node, instance, None):
2640       _ShutdownInstanceDisks(instance, self.cfg)
2641       raise errors.OpExecError("Could not start instance %s on node %s." %
2642                                (instance.name, target_node))
2643
2644
2645 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2646   """Create a tree of block devices on the primary node.
2647
2648   This always creates all devices.
2649
2650   """
2651   if device.children:
2652     for child in device.children:
2653       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2654         return False
2655
2656   cfg.SetDiskID(device, node)
2657   new_id = rpc.call_blockdev_create(node, device, device.size,
2658                                     instance.name, True, info)
2659   if not new_id:
2660     return False
2661   if device.physical_id is None:
2662     device.physical_id = new_id
2663   return True
2664
2665
2666 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2667   """Create a tree of block devices on a secondary node.
2668
2669   If this device type has to be created on secondaries, create it and
2670   all its children.
2671
2672   If not, just recurse to children keeping the same 'force' value.
2673
2674   """
2675   if device.CreateOnSecondary():
2676     force = True
2677   if device.children:
2678     for child in device.children:
2679       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2680                                         child, force, info):
2681         return False
2682
2683   if not force:
2684     return True
2685   cfg.SetDiskID(device, node)
2686   new_id = rpc.call_blockdev_create(node, device, device.size,
2687                                     instance.name, False, info)
2688   if not new_id:
2689     return False
2690   if device.physical_id is None:
2691     device.physical_id = new_id
2692   return True
2693
2694
2695 def _GenerateUniqueNames(cfg, exts):
2696   """Generate a suitable LV name.
2697
2698   This will generate a logical volume name for the given instance.
2699
2700   """
2701   results = []
2702   for val in exts:
2703     new_id = cfg.GenerateUniqueID()
2704     results.append("%s%s" % (new_id, val))
2705   return results
2706
2707
2708 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2709   """Generate a drbd device complete with its children.
2710
2711   """
2712   port = cfg.AllocatePort()
2713   vgname = cfg.GetVGName()
2714   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2715                           logical_id=(vgname, names[0]))
2716   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2717                           logical_id=(vgname, names[1]))
2718   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2719                           logical_id = (primary, secondary, port),
2720                           children = [dev_data, dev_meta])
2721   return drbd_dev
2722
2723
2724 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2725   """Generate a drbd8 device complete with its children.
2726
2727   """
2728   port = cfg.AllocatePort()
2729   vgname = cfg.GetVGName()
2730   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2731                           logical_id=(vgname, names[0]))
2732   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2733                           logical_id=(vgname, names[1]))
2734   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2735                           logical_id = (primary, secondary, port),
2736                           children = [dev_data, dev_meta],
2737                           iv_name=iv_name)
2738   return drbd_dev
2739
2740 def _GenerateDiskTemplate(cfg, template_name,
2741                           instance_name, primary_node,
2742                           secondary_nodes, disk_sz, swap_sz):
2743   """Generate the entire disk layout for a given template type.
2744
2745   """
2746   #TODO: compute space requirements
2747
2748   vgname = cfg.GetVGName()
2749   if template_name == "diskless":
2750     disks = []
2751   elif template_name == "plain":
2752     if len(secondary_nodes) != 0:
2753       raise errors.ProgrammerError("Wrong template configuration")
2754
2755     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2756     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2757                            logical_id=(vgname, names[0]),
2758                            iv_name = "sda")
2759     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2760                            logical_id=(vgname, names[1]),
2761                            iv_name = "sdb")
2762     disks = [sda_dev, sdb_dev]
2763   elif template_name == "local_raid1":
2764     if len(secondary_nodes) != 0:
2765       raise errors.ProgrammerError("Wrong template configuration")
2766
2767
2768     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2769                                        ".sdb_m1", ".sdb_m2"])
2770     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2771                               logical_id=(vgname, names[0]))
2772     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2773                               logical_id=(vgname, names[1]))
2774     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2775                               size=disk_sz,
2776                               children = [sda_dev_m1, sda_dev_m2])
2777     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2778                               logical_id=(vgname, names[2]))
2779     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2780                               logical_id=(vgname, names[3]))
2781     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2782                               size=swap_sz,
2783                               children = [sdb_dev_m1, sdb_dev_m2])
2784     disks = [md_sda_dev, md_sdb_dev]
2785   elif template_name == constants.DT_REMOTE_RAID1:
2786     if len(secondary_nodes) != 1:
2787       raise errors.ProgrammerError("Wrong template configuration")
2788     remote_node = secondary_nodes[0]
2789     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2790                                        ".sdb_data", ".sdb_meta"])
2791     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2792                                          disk_sz, names[0:2])
2793     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2794                               children = [drbd_sda_dev], size=disk_sz)
2795     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2796                                          swap_sz, names[2:4])
2797     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2798                               children = [drbd_sdb_dev], size=swap_sz)
2799     disks = [md_sda_dev, md_sdb_dev]
2800   elif template_name == constants.DT_DRBD8:
2801     if len(secondary_nodes) != 1:
2802       raise errors.ProgrammerError("Wrong template configuration")
2803     remote_node = secondary_nodes[0]
2804     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2805                                        ".sdb_data", ".sdb_meta"])
2806     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2807                                          disk_sz, names[0:2], "sda")
2808     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809                                          swap_sz, names[2:4], "sdb")
2810     disks = [drbd_sda_dev, drbd_sdb_dev]
2811   else:
2812     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2813   return disks
2814
2815
2816 def _GetInstanceInfoText(instance):
2817   """Compute that text that should be added to the disk's metadata.
2818
2819   """
2820   return "originstname+%s" % instance.name
2821
2822
2823 def _CreateDisks(cfg, instance):
2824   """Create all disks for an instance.
2825
2826   This abstracts away some work from AddInstance.
2827
2828   Args:
2829     instance: the instance object
2830
2831   Returns:
2832     True or False showing the success of the creation process
2833
2834   """
2835   info = _GetInstanceInfoText(instance)
2836
2837   for device in instance.disks:
2838     logger.Info("creating volume %s for instance %s" %
2839               (device.iv_name, instance.name))
2840     #HARDCODE
2841     for secondary_node in instance.secondary_nodes:
2842       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2843                                         device, False, info):
2844         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2845                      (device.iv_name, device, secondary_node))
2846         return False
2847     #HARDCODE
2848     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2849                                     instance, device, info):
2850       logger.Error("failed to create volume %s on primary!" %
2851                    device.iv_name)
2852       return False
2853   return True
2854
2855
2856 def _RemoveDisks(instance, cfg):
2857   """Remove all disks for an instance.
2858
2859   This abstracts away some work from `AddInstance()` and
2860   `RemoveInstance()`. Note that in case some of the devices couldn't
2861   be removed, the removal will continue with the other ones (compare
2862   with `_CreateDisks()`).
2863
2864   Args:
2865     instance: the instance object
2866
2867   Returns:
2868     True or False showing the success of the removal proces
2869
2870   """
2871   logger.Info("removing block devices for instance %s" % instance.name)
2872
2873   result = True
2874   for device in instance.disks:
2875     for node, disk in device.ComputeNodeTree(instance.primary_node):
2876       cfg.SetDiskID(disk, node)
2877       if not rpc.call_blockdev_remove(node, disk):
2878         logger.Error("could not remove block device %s on node %s,"
2879                      " continuing anyway" %
2880                      (device.iv_name, node))
2881         result = False
2882   return result
2883
2884
2885 class LUCreateInstance(LogicalUnit):
2886   """Create an instance.
2887
2888   """
2889   HPATH = "instance-add"
2890   HTYPE = constants.HTYPE_INSTANCE
2891   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2892               "disk_template", "swap_size", "mode", "start", "vcpus",
2893               "wait_for_sync", "ip_check", "mac"]
2894
2895   def BuildHooksEnv(self):
2896     """Build hooks env.
2897
2898     This runs on master, primary and secondary nodes of the instance.
2899
2900     """
2901     env = {
2902       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2903       "INSTANCE_DISK_SIZE": self.op.disk_size,
2904       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2905       "INSTANCE_ADD_MODE": self.op.mode,
2906       }
2907     if self.op.mode == constants.INSTANCE_IMPORT:
2908       env["INSTANCE_SRC_NODE"] = self.op.src_node
2909       env["INSTANCE_SRC_PATH"] = self.op.src_path
2910       env["INSTANCE_SRC_IMAGE"] = self.src_image
2911
2912     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2913       primary_node=self.op.pnode,
2914       secondary_nodes=self.secondaries,
2915       status=self.instance_status,
2916       os_type=self.op.os_type,
2917       memory=self.op.mem_size,
2918       vcpus=self.op.vcpus,
2919       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2920     ))
2921
2922     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2923           self.secondaries)
2924     return env, nl, nl
2925
2926
2927   def CheckPrereq(self):
2928     """Check prerequisites.
2929
2930     """
2931     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2932       if not hasattr(self.op, attr):
2933         setattr(self.op, attr, None)
2934
2935     if self.op.mode not in (constants.INSTANCE_CREATE,
2936                             constants.INSTANCE_IMPORT):
2937       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2938                                  self.op.mode)
2939
2940     if self.op.mode == constants.INSTANCE_IMPORT:
2941       src_node = getattr(self.op, "src_node", None)
2942       src_path = getattr(self.op, "src_path", None)
2943       if src_node is None or src_path is None:
2944         raise errors.OpPrereqError("Importing an instance requires source"
2945                                    " node and path options")
2946       src_node_full = self.cfg.ExpandNodeName(src_node)
2947       if src_node_full is None:
2948         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2949       self.op.src_node = src_node = src_node_full
2950
2951       if not os.path.isabs(src_path):
2952         raise errors.OpPrereqError("The source path must be absolute")
2953
2954       export_info = rpc.call_export_info(src_node, src_path)
2955
2956       if not export_info:
2957         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2958
2959       if not export_info.has_section(constants.INISECT_EXP):
2960         raise errors.ProgrammerError("Corrupted export config")
2961
2962       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2963       if (int(ei_version) != constants.EXPORT_VERSION):
2964         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2965                                    (ei_version, constants.EXPORT_VERSION))
2966
2967       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2968         raise errors.OpPrereqError("Can't import instance with more than"
2969                                    " one data disk")
2970
2971       # FIXME: are the old os-es, disk sizes, etc. useful?
2972       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2973       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2974                                                          'disk0_dump'))
2975       self.src_image = diskimage
2976     else: # INSTANCE_CREATE
2977       if getattr(self.op, "os_type", None) is None:
2978         raise errors.OpPrereqError("No guest OS specified")
2979
2980     # check primary node
2981     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2982     if pnode is None:
2983       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2984                                  self.op.pnode)
2985     self.op.pnode = pnode.name
2986     self.pnode = pnode
2987     self.secondaries = []
2988     # disk template and mirror node verification
2989     if self.op.disk_template not in constants.DISK_TEMPLATES:
2990       raise errors.OpPrereqError("Invalid disk template name")
2991
2992     if self.op.disk_template in constants.DTS_NET_MIRROR:
2993       if getattr(self.op, "snode", None) is None:
2994         raise errors.OpPrereqError("The networked disk templates need"
2995                                    " a mirror node")
2996
2997       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2998       if snode_name is None:
2999         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3000                                    self.op.snode)
3001       elif snode_name == pnode.name:
3002         raise errors.OpPrereqError("The secondary node cannot be"
3003                                    " the primary node.")
3004       self.secondaries.append(snode_name)
3005
3006     # Required free disk space as a function of disk and swap space
3007     req_size_dict = {
3008       constants.DT_DISKLESS: None,
3009       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3010       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
3011       # 256 MB are added for drbd metadata, 128MB for each drbd device
3012       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
3013       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3014     }
3015
3016     if self.op.disk_template not in req_size_dict:
3017       raise errors.ProgrammerError("Disk template '%s' size requirement"
3018                                    " is unknown" %  self.op.disk_template)
3019
3020     req_size = req_size_dict[self.op.disk_template]
3021
3022     # Check lv size requirements
3023     if req_size is not None:
3024       nodenames = [pnode.name] + self.secondaries
3025       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3026       for node in nodenames:
3027         info = nodeinfo.get(node, None)
3028         if not info:
3029           raise errors.OpPrereqError("Cannot get current information"
3030                                      " from node '%s'" % nodeinfo)
3031         vg_free = info.get('vg_free', None)
3032         if not isinstance(vg_free, int):
3033           raise errors.OpPrereqError("Can't compute free disk space on"
3034                                      " node %s" % node)
3035         if req_size > info['vg_free']:
3036           raise errors.OpPrereqError("Not enough disk space on target node %s."
3037                                      " %d MB available, %d MB required" %
3038                                      (node, info['vg_free'], req_size))
3039
3040     # os verification
3041     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3042     if not os_obj:
3043       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3044                                  " primary node"  % self.op.os_type)
3045
3046     if self.op.kernel_path == constants.VALUE_NONE:
3047       raise errors.OpPrereqError("Can't set instance kernel to none")
3048
3049     # instance verification
3050     hostname1 = utils.HostInfo(self.op.instance_name)
3051
3052     self.op.instance_name = instance_name = hostname1.name
3053     instance_list = self.cfg.GetInstanceList()
3054     if instance_name in instance_list:
3055       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3056                                  instance_name)
3057
3058     ip = getattr(self.op, "ip", None)
3059     if ip is None or ip.lower() == "none":
3060       inst_ip = None
3061     elif ip.lower() == "auto":
3062       inst_ip = hostname1.ip
3063     else:
3064       if not utils.IsValidIP(ip):
3065         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3066                                    " like a valid IP" % ip)
3067       inst_ip = ip
3068     self.inst_ip = inst_ip
3069
3070     if self.op.start and not self.op.ip_check:
3071       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3072                                  " adding an instance in start mode")
3073
3074     if self.op.ip_check:
3075       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3076                        constants.DEFAULT_NODED_PORT):
3077         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3078                                    (hostname1.ip, instance_name))
3079
3080     # MAC address verification
3081     if self.op.mac != "auto":
3082       if not utils.IsValidMac(self.op.mac.lower()):
3083         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3084                                    self.op.mac)
3085
3086     # bridge verification
3087     bridge = getattr(self.op, "bridge", None)
3088     if bridge is None:
3089       self.op.bridge = self.cfg.GetDefBridge()
3090     else:
3091       self.op.bridge = bridge
3092
3093     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3094       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3095                                  " destination node '%s'" %
3096                                  (self.op.bridge, pnode.name))
3097
3098     # boot order verification
3099     if self.op.hvm_boot_order is not None:
3100       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3101         raise errors.OpPrereqError("invalid boot order specified,"
3102                                    " must be one or more of [acdn]")
3103
3104     if self.op.start:
3105       self.instance_status = 'up'
3106     else:
3107       self.instance_status = 'down'
3108
3109   def Exec(self, feedback_fn):
3110     """Create and add the instance to the cluster.
3111
3112     """
3113     instance = self.op.instance_name
3114     pnode_name = self.pnode.name
3115
3116     if self.op.mac == "auto":
3117       mac_address = self.cfg.GenerateMAC()
3118     else:
3119       mac_address = self.op.mac
3120
3121     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3122     if self.inst_ip is not None:
3123       nic.ip = self.inst_ip
3124
3125     ht_kind = self.sstore.GetHypervisorType()
3126     if ht_kind in constants.HTS_REQ_PORT:
3127       network_port = self.cfg.AllocatePort()
3128     else:
3129       network_port = None
3130
3131     disks = _GenerateDiskTemplate(self.cfg,
3132                                   self.op.disk_template,
3133                                   instance, pnode_name,
3134                                   self.secondaries, self.op.disk_size,
3135                                   self.op.swap_size)
3136
3137     iobj = objects.Instance(name=instance, os=self.op.os_type,
3138                             primary_node=pnode_name,
3139                             memory=self.op.mem_size,
3140                             vcpus=self.op.vcpus,
3141                             nics=[nic], disks=disks,
3142                             disk_template=self.op.disk_template,
3143                             status=self.instance_status,
3144                             network_port=network_port,
3145                             kernel_path=self.op.kernel_path,
3146                             initrd_path=self.op.initrd_path,
3147                             hvm_boot_order=self.op.hvm_boot_order,
3148                             )
3149
3150     feedback_fn("* creating instance disks...")
3151     if not _CreateDisks(self.cfg, iobj):
3152       _RemoveDisks(iobj, self.cfg)
3153       raise errors.OpExecError("Device creation failed, reverting...")
3154
3155     feedback_fn("adding instance %s to cluster config" % instance)
3156
3157     self.cfg.AddInstance(iobj)
3158
3159     if self.op.wait_for_sync:
3160       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3161     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3162       # make sure the disks are not degraded (still sync-ing is ok)
3163       time.sleep(15)
3164       feedback_fn("* checking mirrors status")
3165       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3166     else:
3167       disk_abort = False
3168
3169     if disk_abort:
3170       _RemoveDisks(iobj, self.cfg)
3171       self.cfg.RemoveInstance(iobj.name)
3172       raise errors.OpExecError("There are some degraded disks for"
3173                                " this instance")
3174
3175     feedback_fn("creating os for instance %s on node %s" %
3176                 (instance, pnode_name))
3177
3178     if iobj.disk_template != constants.DT_DISKLESS:
3179       if self.op.mode == constants.INSTANCE_CREATE:
3180         feedback_fn("* running the instance OS create scripts...")
3181         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3182           raise errors.OpExecError("could not add os for instance %s"
3183                                    " on node %s" %
3184                                    (instance, pnode_name))
3185
3186       elif self.op.mode == constants.INSTANCE_IMPORT:
3187         feedback_fn("* running the instance OS import scripts...")
3188         src_node = self.op.src_node
3189         src_image = self.src_image
3190         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3191                                                 src_node, src_image):
3192           raise errors.OpExecError("Could not import os for instance"
3193                                    " %s on node %s" %
3194                                    (instance, pnode_name))
3195       else:
3196         # also checked in the prereq part
3197         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3198                                      % self.op.mode)
3199
3200     if self.op.start:
3201       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3202       feedback_fn("* starting instance...")
3203       if not rpc.call_instance_start(pnode_name, iobj, None):
3204         raise errors.OpExecError("Could not start instance")
3205
3206
3207 class LUConnectConsole(NoHooksLU):
3208   """Connect to an instance's console.
3209
3210   This is somewhat special in that it returns the command line that
3211   you need to run on the master node in order to connect to the
3212   console.
3213
3214   """
3215   _OP_REQP = ["instance_name"]
3216
3217   def CheckPrereq(self):
3218     """Check prerequisites.
3219
3220     This checks that the instance is in the cluster.
3221
3222     """
3223     instance = self.cfg.GetInstanceInfo(
3224       self.cfg.ExpandInstanceName(self.op.instance_name))
3225     if instance is None:
3226       raise errors.OpPrereqError("Instance '%s' not known" %
3227                                  self.op.instance_name)
3228     self.instance = instance
3229
3230   def Exec(self, feedback_fn):
3231     """Connect to the console of an instance
3232
3233     """
3234     instance = self.instance
3235     node = instance.primary_node
3236
3237     node_insts = rpc.call_instance_list([node])[node]
3238     if node_insts is False:
3239       raise errors.OpExecError("Can't connect to node %s." % node)
3240
3241     if instance.name not in node_insts:
3242       raise errors.OpExecError("Instance %s is not running." % instance.name)
3243
3244     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3245
3246     hyper = hypervisor.GetHypervisor()
3247     console_cmd = hyper.GetShellCommandForConsole(instance)
3248     # build ssh cmdline
3249     argv = ["ssh", "-q", "-t"]
3250     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3251     argv.extend(ssh.BATCH_MODE_OPTS)
3252     argv.append(node)
3253     argv.append(console_cmd)
3254     return "ssh", argv
3255
3256
3257 class LUAddMDDRBDComponent(LogicalUnit):
3258   """Adda new mirror member to an instance's disk.
3259
3260   """
3261   HPATH = "mirror-add"
3262   HTYPE = constants.HTYPE_INSTANCE
3263   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3264
3265   def BuildHooksEnv(self):
3266     """Build hooks env.
3267
3268     This runs on the master, the primary and all the secondaries.
3269
3270     """
3271     env = {
3272       "NEW_SECONDARY": self.op.remote_node,
3273       "DISK_NAME": self.op.disk_name,
3274       }
3275     env.update(_BuildInstanceHookEnvByObject(self.instance))
3276     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3277           self.op.remote_node,] + list(self.instance.secondary_nodes)
3278     return env, nl, nl
3279
3280   def CheckPrereq(self):
3281     """Check prerequisites.
3282
3283     This checks that the instance is in the cluster.
3284
3285     """
3286     instance = self.cfg.GetInstanceInfo(
3287       self.cfg.ExpandInstanceName(self.op.instance_name))
3288     if instance is None:
3289       raise errors.OpPrereqError("Instance '%s' not known" %
3290                                  self.op.instance_name)
3291     self.instance = instance
3292
3293     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3294     if remote_node is None:
3295       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3296     self.remote_node = remote_node
3297
3298     if remote_node == instance.primary_node:
3299       raise errors.OpPrereqError("The specified node is the primary node of"
3300                                  " the instance.")
3301
3302     if instance.disk_template != constants.DT_REMOTE_RAID1:
3303       raise errors.OpPrereqError("Instance's disk layout is not"
3304                                  " remote_raid1.")
3305     for disk in instance.disks:
3306       if disk.iv_name == self.op.disk_name:
3307         break
3308     else:
3309       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3310                                  " instance." % self.op.disk_name)
3311     if len(disk.children) > 1:
3312       raise errors.OpPrereqError("The device already has two slave devices."
3313                                  " This would create a 3-disk raid1 which we"
3314                                  " don't allow.")
3315     self.disk = disk
3316
3317   def Exec(self, feedback_fn):
3318     """Add the mirror component
3319
3320     """
3321     disk = self.disk
3322     instance = self.instance
3323
3324     remote_node = self.remote_node
3325     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3326     names = _GenerateUniqueNames(self.cfg, lv_names)
3327     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3328                                      remote_node, disk.size, names)
3329
3330     logger.Info("adding new mirror component on secondary")
3331     #HARDCODE
3332     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3333                                       new_drbd, False,
3334                                       _GetInstanceInfoText(instance)):
3335       raise errors.OpExecError("Failed to create new component on secondary"
3336                                " node %s" % remote_node)
3337
3338     logger.Info("adding new mirror component on primary")
3339     #HARDCODE
3340     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3341                                     instance, new_drbd,
3342                                     _GetInstanceInfoText(instance)):
3343       # remove secondary dev
3344       self.cfg.SetDiskID(new_drbd, remote_node)
3345       rpc.call_blockdev_remove(remote_node, new_drbd)
3346       raise errors.OpExecError("Failed to create volume on primary")
3347
3348     # the device exists now
3349     # call the primary node to add the mirror to md
3350     logger.Info("adding new mirror component to md")
3351     if not rpc.call_blockdev_addchildren(instance.primary_node,
3352                                          disk, [new_drbd]):
3353       logger.Error("Can't add mirror compoment to md!")
3354       self.cfg.SetDiskID(new_drbd, remote_node)
3355       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3356         logger.Error("Can't rollback on secondary")
3357       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3358       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3359         logger.Error("Can't rollback on primary")
3360       raise errors.OpExecError("Can't add mirror component to md array")
3361
3362     disk.children.append(new_drbd)
3363
3364     self.cfg.AddInstance(instance)
3365
3366     _WaitForSync(self.cfg, instance, self.proc)
3367
3368     return 0
3369
3370
3371 class LURemoveMDDRBDComponent(LogicalUnit):
3372   """Remove a component from a remote_raid1 disk.
3373
3374   """
3375   HPATH = "mirror-remove"
3376   HTYPE = constants.HTYPE_INSTANCE
3377   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3378
3379   def BuildHooksEnv(self):
3380     """Build hooks env.
3381
3382     This runs on the master, the primary and all the secondaries.
3383
3384     """
3385     env = {
3386       "DISK_NAME": self.op.disk_name,
3387       "DISK_ID": self.op.disk_id,
3388       "OLD_SECONDARY": self.old_secondary,
3389       }
3390     env.update(_BuildInstanceHookEnvByObject(self.instance))
3391     nl = [self.sstore.GetMasterNode(),
3392           self.instance.primary_node] + list(self.instance.secondary_nodes)
3393     return env, nl, nl
3394
3395   def CheckPrereq(self):
3396     """Check prerequisites.
3397
3398     This checks that the instance is in the cluster.
3399
3400     """
3401     instance = self.cfg.GetInstanceInfo(
3402       self.cfg.ExpandInstanceName(self.op.instance_name))
3403     if instance is None:
3404       raise errors.OpPrereqError("Instance '%s' not known" %
3405                                  self.op.instance_name)
3406     self.instance = instance
3407
3408     if instance.disk_template != constants.DT_REMOTE_RAID1:
3409       raise errors.OpPrereqError("Instance's disk layout is not"
3410                                  " remote_raid1.")
3411     for disk in instance.disks:
3412       if disk.iv_name == self.op.disk_name:
3413         break
3414     else:
3415       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3416                                  " instance." % self.op.disk_name)
3417     for child in disk.children:
3418       if (child.dev_type == constants.LD_DRBD7 and
3419           child.logical_id[2] == self.op.disk_id):
3420         break
3421     else:
3422       raise errors.OpPrereqError("Can't find the device with this port.")
3423
3424     if len(disk.children) < 2:
3425       raise errors.OpPrereqError("Cannot remove the last component from"
3426                                  " a mirror.")
3427     self.disk = disk
3428     self.child = child
3429     if self.child.logical_id[0] == instance.primary_node:
3430       oid = 1
3431     else:
3432       oid = 0
3433     self.old_secondary = self.child.logical_id[oid]
3434
3435   def Exec(self, feedback_fn):
3436     """Remove the mirror component
3437
3438     """
3439     instance = self.instance
3440     disk = self.disk
3441     child = self.child
3442     logger.Info("remove mirror component")
3443     self.cfg.SetDiskID(disk, instance.primary_node)
3444     if not rpc.call_blockdev_removechildren(instance.primary_node,
3445                                             disk, [child]):
3446       raise errors.OpExecError("Can't remove child from mirror.")
3447
3448     for node in child.logical_id[:2]:
3449       self.cfg.SetDiskID(child, node)
3450       if not rpc.call_blockdev_remove(node, child):
3451         logger.Error("Warning: failed to remove device from node %s,"
3452                      " continuing operation." % node)
3453
3454     disk.children.remove(child)
3455     self.cfg.AddInstance(instance)
3456
3457
3458 class LUReplaceDisks(LogicalUnit):
3459   """Replace the disks of an instance.
3460
3461   """
3462   HPATH = "mirrors-replace"
3463   HTYPE = constants.HTYPE_INSTANCE
3464   _OP_REQP = ["instance_name", "mode", "disks"]
3465
3466   def BuildHooksEnv(self):
3467     """Build hooks env.
3468
3469     This runs on the master, the primary and all the secondaries.
3470
3471     """
3472     env = {
3473       "MODE": self.op.mode,
3474       "NEW_SECONDARY": self.op.remote_node,
3475       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3476       }
3477     env.update(_BuildInstanceHookEnvByObject(self.instance))
3478     nl = [
3479       self.sstore.GetMasterNode(),
3480       self.instance.primary_node,
3481       ]
3482     if self.op.remote_node is not None:
3483       nl.append(self.op.remote_node)
3484     return env, nl, nl
3485
3486   def CheckPrereq(self):
3487     """Check prerequisites.
3488
3489     This checks that the instance is in the cluster.
3490
3491     """
3492     instance = self.cfg.GetInstanceInfo(
3493       self.cfg.ExpandInstanceName(self.op.instance_name))
3494     if instance is None:
3495       raise errors.OpPrereqError("Instance '%s' not known" %
3496                                  self.op.instance_name)
3497     self.instance = instance
3498     self.op.instance_name = instance.name
3499
3500     if instance.disk_template not in constants.DTS_NET_MIRROR:
3501       raise errors.OpPrereqError("Instance's disk layout is not"
3502                                  " network mirrored.")
3503
3504     if len(instance.secondary_nodes) != 1:
3505       raise errors.OpPrereqError("The instance has a strange layout,"
3506                                  " expected one secondary but found %d" %
3507                                  len(instance.secondary_nodes))
3508
3509     self.sec_node = instance.secondary_nodes[0]
3510
3511     remote_node = getattr(self.op, "remote_node", None)
3512     if remote_node is not None:
3513       remote_node = self.cfg.ExpandNodeName(remote_node)
3514       if remote_node is None:
3515         raise errors.OpPrereqError("Node '%s' not known" %
3516                                    self.op.remote_node)
3517       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3518     else:
3519       self.remote_node_info = None
3520     if remote_node == instance.primary_node:
3521       raise errors.OpPrereqError("The specified node is the primary node of"
3522                                  " the instance.")
3523     elif remote_node == self.sec_node:
3524       if self.op.mode == constants.REPLACE_DISK_SEC:
3525         # this is for DRBD8, where we can't execute the same mode of
3526         # replacement as for drbd7 (no different port allocated)
3527         raise errors.OpPrereqError("Same secondary given, cannot execute"
3528                                    " replacement")
3529       # the user gave the current secondary, switch to
3530       # 'no-replace-secondary' mode for drbd7
3531       remote_node = None
3532     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3533         self.op.mode != constants.REPLACE_DISK_ALL):
3534       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3535                                  " disks replacement, not individual ones")
3536     if instance.disk_template == constants.DT_DRBD8:
3537       if (self.op.mode == constants.REPLACE_DISK_ALL and
3538           remote_node is not None):
3539         # switch to replace secondary mode
3540         self.op.mode = constants.REPLACE_DISK_SEC
3541
3542       if self.op.mode == constants.REPLACE_DISK_ALL:
3543         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3544                                    " secondary disk replacement, not"
3545                                    " both at once")
3546       elif self.op.mode == constants.REPLACE_DISK_PRI:
3547         if remote_node is not None:
3548           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3549                                      " the secondary while doing a primary"
3550                                      " node disk replacement")
3551         self.tgt_node = instance.primary_node
3552         self.oth_node = instance.secondary_nodes[0]
3553       elif self.op.mode == constants.REPLACE_DISK_SEC:
3554         self.new_node = remote_node # this can be None, in which case
3555                                     # we don't change the secondary
3556         self.tgt_node = instance.secondary_nodes[0]
3557         self.oth_node = instance.primary_node
3558       else:
3559         raise errors.ProgrammerError("Unhandled disk replace mode")
3560
3561     for name in self.op.disks:
3562       if instance.FindDisk(name) is None:
3563         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3564                                    (name, instance.name))
3565     self.op.remote_node = remote_node
3566
3567   def _ExecRR1(self, feedback_fn):
3568     """Replace the disks of an instance.
3569
3570     """
3571     instance = self.instance
3572     iv_names = {}
3573     # start of work
3574     if self.op.remote_node is None:
3575       remote_node = self.sec_node
3576     else:
3577       remote_node = self.op.remote_node
3578     cfg = self.cfg
3579     for dev in instance.disks:
3580       size = dev.size
3581       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3582       names = _GenerateUniqueNames(cfg, lv_names)
3583       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3584                                        remote_node, size, names)
3585       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3586       logger.Info("adding new mirror component on secondary for %s" %
3587                   dev.iv_name)
3588       #HARDCODE
3589       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3590                                         new_drbd, False,
3591                                         _GetInstanceInfoText(instance)):
3592         raise errors.OpExecError("Failed to create new component on secondary"
3593                                  " node %s. Full abort, cleanup manually!" %
3594                                  remote_node)
3595
3596       logger.Info("adding new mirror component on primary")
3597       #HARDCODE
3598       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3599                                       instance, new_drbd,
3600                                       _GetInstanceInfoText(instance)):
3601         # remove secondary dev
3602         cfg.SetDiskID(new_drbd, remote_node)
3603         rpc.call_blockdev_remove(remote_node, new_drbd)
3604         raise errors.OpExecError("Failed to create volume on primary!"
3605                                  " Full abort, cleanup manually!!")
3606
3607       # the device exists now
3608       # call the primary node to add the mirror to md
3609       logger.Info("adding new mirror component to md")
3610       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3611                                            [new_drbd]):
3612         logger.Error("Can't add mirror compoment to md!")
3613         cfg.SetDiskID(new_drbd, remote_node)
3614         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3615           logger.Error("Can't rollback on secondary")
3616         cfg.SetDiskID(new_drbd, instance.primary_node)
3617         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3618           logger.Error("Can't rollback on primary")
3619         raise errors.OpExecError("Full abort, cleanup manually!!")
3620
3621       dev.children.append(new_drbd)
3622       cfg.AddInstance(instance)
3623
3624     # this can fail as the old devices are degraded and _WaitForSync
3625     # does a combined result over all disks, so we don't check its
3626     # return value
3627     _WaitForSync(cfg, instance, self.proc, unlock=True)
3628
3629     # so check manually all the devices
3630     for name in iv_names:
3631       dev, child, new_drbd = iv_names[name]
3632       cfg.SetDiskID(dev, instance.primary_node)
3633       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3634       if is_degr:
3635         raise errors.OpExecError("MD device %s is degraded!" % name)
3636       cfg.SetDiskID(new_drbd, instance.primary_node)
3637       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3638       if is_degr:
3639         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3640
3641     for name in iv_names:
3642       dev, child, new_drbd = iv_names[name]
3643       logger.Info("remove mirror %s component" % name)
3644       cfg.SetDiskID(dev, instance.primary_node)
3645       if not rpc.call_blockdev_removechildren(instance.primary_node,
3646                                               dev, [child]):
3647         logger.Error("Can't remove child from mirror, aborting"
3648                      " *this device cleanup*.\nYou need to cleanup manually!!")
3649         continue
3650
3651       for node in child.logical_id[:2]:
3652         logger.Info("remove child device on %s" % node)
3653         cfg.SetDiskID(child, node)
3654         if not rpc.call_blockdev_remove(node, child):
3655           logger.Error("Warning: failed to remove device from node %s,"
3656                        " continuing operation." % node)
3657
3658       dev.children.remove(child)
3659
3660       cfg.AddInstance(instance)
3661
3662   def _ExecD8DiskOnly(self, feedback_fn):
3663     """Replace a disk on the primary or secondary for dbrd8.
3664
3665     The algorithm for replace is quite complicated:
3666       - for each disk to be replaced:
3667         - create new LVs on the target node with unique names
3668         - detach old LVs from the drbd device
3669         - rename old LVs to name_replaced.<time_t>
3670         - rename new LVs to old LVs
3671         - attach the new LVs (with the old names now) to the drbd device
3672       - wait for sync across all devices
3673       - for each modified disk:
3674         - remove old LVs (which have the name name_replaces.<time_t>)
3675
3676     Failures are not very well handled.
3677
3678     """
3679     steps_total = 6
3680     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3681     instance = self.instance
3682     iv_names = {}
3683     vgname = self.cfg.GetVGName()
3684     # start of work
3685     cfg = self.cfg
3686     tgt_node = self.tgt_node
3687     oth_node = self.oth_node
3688
3689     # Step: check device activation
3690     self.proc.LogStep(1, steps_total, "check device existence")
3691     info("checking volume groups")
3692     my_vg = cfg.GetVGName()
3693     results = rpc.call_vg_list([oth_node, tgt_node])
3694     if not results:
3695       raise errors.OpExecError("Can't list volume groups on the nodes")
3696     for node in oth_node, tgt_node:
3697       res = results.get(node, False)
3698       if not res or my_vg not in res:
3699         raise errors.OpExecError("Volume group '%s' not found on %s" %
3700                                  (my_vg, node))
3701     for dev in instance.disks:
3702       if not dev.iv_name in self.op.disks:
3703         continue
3704       for node in tgt_node, oth_node:
3705         info("checking %s on %s" % (dev.iv_name, node))
3706         cfg.SetDiskID(dev, node)
3707         if not rpc.call_blockdev_find(node, dev):
3708           raise errors.OpExecError("Can't find device %s on node %s" %
3709                                    (dev.iv_name, node))
3710
3711     # Step: check other node consistency
3712     self.proc.LogStep(2, steps_total, "check peer consistency")
3713     for dev in instance.disks:
3714       if not dev.iv_name in self.op.disks:
3715         continue
3716       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3717       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3718                                    oth_node==instance.primary_node):
3719         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3720                                  " to replace disks on this node (%s)" %
3721                                  (oth_node, tgt_node))
3722
3723     # Step: create new storage
3724     self.proc.LogStep(3, steps_total, "allocate new storage")
3725     for dev in instance.disks:
3726       if not dev.iv_name in self.op.disks:
3727         continue
3728       size = dev.size
3729       cfg.SetDiskID(dev, tgt_node)
3730       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3731       names = _GenerateUniqueNames(cfg, lv_names)
3732       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3733                              logical_id=(vgname, names[0]))
3734       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3735                              logical_id=(vgname, names[1]))
3736       new_lvs = [lv_data, lv_meta]
3737       old_lvs = dev.children
3738       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3739       info("creating new local storage on %s for %s" %
3740            (tgt_node, dev.iv_name))
3741       # since we *always* want to create this LV, we use the
3742       # _Create...OnPrimary (which forces the creation), even if we
3743       # are talking about the secondary node
3744       for new_lv in new_lvs:
3745         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3746                                         _GetInstanceInfoText(instance)):
3747           raise errors.OpExecError("Failed to create new LV named '%s' on"
3748                                    " node '%s'" %
3749                                    (new_lv.logical_id[1], tgt_node))
3750
3751     # Step: for each lv, detach+rename*2+attach
3752     self.proc.LogStep(4, steps_total, "change drbd configuration")
3753     for dev, old_lvs, new_lvs in iv_names.itervalues():
3754       info("detaching %s drbd from local storage" % dev.iv_name)
3755       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3756         raise errors.OpExecError("Can't detach drbd from local storage on node"
3757                                  " %s for device %s" % (tgt_node, dev.iv_name))
3758       #dev.children = []
3759       #cfg.Update(instance)
3760
3761       # ok, we created the new LVs, so now we know we have the needed
3762       # storage; as such, we proceed on the target node to rename
3763       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3764       # using the assumption than logical_id == physical_id (which in
3765       # turn is the unique_id on that node)
3766
3767       # FIXME(iustin): use a better name for the replaced LVs
3768       temp_suffix = int(time.time())
3769       ren_fn = lambda d, suff: (d.physical_id[0],
3770                                 d.physical_id[1] + "_replaced-%s" % suff)
3771       # build the rename list based on what LVs exist on the node
3772       rlist = []
3773       for to_ren in old_lvs:
3774         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3775         if find_res is not None: # device exists
3776           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3777
3778       info("renaming the old LVs on the target node")
3779       if not rpc.call_blockdev_rename(tgt_node, rlist):
3780         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3781       # now we rename the new LVs to the old LVs
3782       info("renaming the new LVs on the target node")
3783       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3784       if not rpc.call_blockdev_rename(tgt_node, rlist):
3785         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3786
3787       for old, new in zip(old_lvs, new_lvs):
3788         new.logical_id = old.logical_id
3789         cfg.SetDiskID(new, tgt_node)
3790
3791       for disk in old_lvs:
3792         disk.logical_id = ren_fn(disk, temp_suffix)
3793         cfg.SetDiskID(disk, tgt_node)
3794
3795       # now that the new lvs have the old name, we can add them to the device
3796       info("adding new mirror component on %s" % tgt_node)
3797       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3798         for new_lv in new_lvs:
3799           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3800             warning("Can't rollback device %s", hint="manually cleanup unused"
3801                     " logical volumes")
3802         raise errors.OpExecError("Can't add local storage to drbd")
3803
3804       dev.children = new_lvs
3805       cfg.Update(instance)
3806
3807     # Step: wait for sync
3808
3809     # this can fail as the old devices are degraded and _WaitForSync
3810     # does a combined result over all disks, so we don't check its
3811     # return value
3812     self.proc.LogStep(5, steps_total, "sync devices")
3813     _WaitForSync(cfg, instance, self.proc, unlock=True)
3814
3815     # so check manually all the devices
3816     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3817       cfg.SetDiskID(dev, instance.primary_node)
3818       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3819       if is_degr:
3820         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3821
3822     # Step: remove old storage
3823     self.proc.LogStep(6, steps_total, "removing old storage")
3824     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3825       info("remove logical volumes for %s" % name)
3826       for lv in old_lvs:
3827         cfg.SetDiskID(lv, tgt_node)
3828         if not rpc.call_blockdev_remove(tgt_node, lv):
3829           warning("Can't remove old LV", hint="manually remove unused LVs")
3830           continue
3831
3832   def _ExecD8Secondary(self, feedback_fn):
3833     """Replace the secondary node for drbd8.
3834
3835     The algorithm for replace is quite complicated:
3836       - for all disks of the instance:
3837         - create new LVs on the new node with same names
3838         - shutdown the drbd device on the old secondary
3839         - disconnect the drbd network on the primary
3840         - create the drbd device on the new secondary
3841         - network attach the drbd on the primary, using an artifice:
3842           the drbd code for Attach() will connect to the network if it
3843           finds a device which is connected to the good local disks but
3844           not network enabled
3845       - wait for sync across all devices
3846       - remove all disks from the old secondary
3847
3848     Failures are not very well handled.
3849
3850     """
3851     steps_total = 6
3852     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3853     instance = self.instance
3854     iv_names = {}
3855     vgname = self.cfg.GetVGName()
3856     # start of work
3857     cfg = self.cfg
3858     old_node = self.tgt_node
3859     new_node = self.new_node
3860     pri_node = instance.primary_node
3861
3862     # Step: check device activation
3863     self.proc.LogStep(1, steps_total, "check device existence")
3864     info("checking volume groups")
3865     my_vg = cfg.GetVGName()
3866     results = rpc.call_vg_list([pri_node, new_node])
3867     if not results:
3868       raise errors.OpExecError("Can't list volume groups on the nodes")
3869     for node in pri_node, new_node:
3870       res = results.get(node, False)
3871       if not res or my_vg not in res:
3872         raise errors.OpExecError("Volume group '%s' not found on %s" %
3873                                  (my_vg, node))
3874     for dev in instance.disks:
3875       if not dev.iv_name in self.op.disks:
3876         continue
3877       info("checking %s on %s" % (dev.iv_name, pri_node))
3878       cfg.SetDiskID(dev, pri_node)
3879       if not rpc.call_blockdev_find(pri_node, dev):
3880         raise errors.OpExecError("Can't find device %s on node %s" %
3881                                  (dev.iv_name, pri_node))
3882
3883     # Step: check other node consistency
3884     self.proc.LogStep(2, steps_total, "check peer consistency")
3885     for dev in instance.disks:
3886       if not dev.iv_name in self.op.disks:
3887         continue
3888       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3889       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3890         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3891                                  " unsafe to replace the secondary" %
3892                                  pri_node)
3893
3894     # Step: create new storage
3895     self.proc.LogStep(3, steps_total, "allocate new storage")
3896     for dev in instance.disks:
3897       size = dev.size
3898       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3899       # since we *always* want to create this LV, we use the
3900       # _Create...OnPrimary (which forces the creation), even if we
3901       # are talking about the secondary node
3902       for new_lv in dev.children:
3903         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3904                                         _GetInstanceInfoText(instance)):
3905           raise errors.OpExecError("Failed to create new LV named '%s' on"
3906                                    " node '%s'" %
3907                                    (new_lv.logical_id[1], new_node))
3908
3909       iv_names[dev.iv_name] = (dev, dev.children)
3910
3911     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3912     for dev in instance.disks:
3913       size = dev.size
3914       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3915       # create new devices on new_node
3916       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3917                               logical_id=(pri_node, new_node,
3918                                           dev.logical_id[2]),
3919                               children=dev.children)
3920       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3921                                         new_drbd, False,
3922                                       _GetInstanceInfoText(instance)):
3923         raise errors.OpExecError("Failed to create new DRBD on"
3924                                  " node '%s'" % new_node)
3925
3926     for dev in instance.disks:
3927       # we have new devices, shutdown the drbd on the old secondary
3928       info("shutting down drbd for %s on old node" % dev.iv_name)
3929       cfg.SetDiskID(dev, old_node)
3930       if not rpc.call_blockdev_shutdown(old_node, dev):
3931         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3932                 hint="Please cleanup this device manually as soon as possible")
3933
3934     info("detaching primary drbds from the network (=> standalone)")
3935     done = 0
3936     for dev in instance.disks:
3937       cfg.SetDiskID(dev, pri_node)
3938       # set the physical (unique in bdev terms) id to None, meaning
3939       # detach from network
3940       dev.physical_id = (None,) * len(dev.physical_id)
3941       # and 'find' the device, which will 'fix' it to match the
3942       # standalone state
3943       if rpc.call_blockdev_find(pri_node, dev):
3944         done += 1
3945       else:
3946         warning("Failed to detach drbd %s from network, unusual case" %
3947                 dev.iv_name)
3948
3949     if not done:
3950       # no detaches succeeded (very unlikely)
3951       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3952
3953     # if we managed to detach at least one, we update all the disks of
3954     # the instance to point to the new secondary
3955     info("updating instance configuration")
3956     for dev in instance.disks:
3957       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3958       cfg.SetDiskID(dev, pri_node)
3959     cfg.Update(instance)
3960
3961     # and now perform the drbd attach
3962     info("attaching primary drbds to new secondary (standalone => connected)")
3963     failures = []
3964     for dev in instance.disks:
3965       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3966       # since the attach is smart, it's enough to 'find' the device,
3967       # it will automatically activate the network, if the physical_id
3968       # is correct
3969       cfg.SetDiskID(dev, pri_node)
3970       if not rpc.call_blockdev_find(pri_node, dev):
3971         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3972                 "please do a gnt-instance info to see the status of disks")
3973
3974     # this can fail as the old devices are degraded and _WaitForSync
3975     # does a combined result over all disks, so we don't check its
3976     # return value
3977     self.proc.LogStep(5, steps_total, "sync devices")
3978     _WaitForSync(cfg, instance, self.proc, unlock=True)
3979
3980     # so check manually all the devices
3981     for name, (dev, old_lvs) in iv_names.iteritems():
3982       cfg.SetDiskID(dev, pri_node)
3983       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3984       if is_degr:
3985         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3986
3987     self.proc.LogStep(6, steps_total, "removing old storage")
3988     for name, (dev, old_lvs) in iv_names.iteritems():
3989       info("remove logical volumes for %s" % name)
3990       for lv in old_lvs:
3991         cfg.SetDiskID(lv, old_node)
3992         if not rpc.call_blockdev_remove(old_node, lv):
3993           warning("Can't remove LV on old secondary",
3994                   hint="Cleanup stale volumes by hand")
3995
3996   def Exec(self, feedback_fn):
3997     """Execute disk replacement.
3998
3999     This dispatches the disk replacement to the appropriate handler.
4000
4001     """
4002     instance = self.instance
4003     if instance.disk_template == constants.DT_REMOTE_RAID1:
4004       fn = self._ExecRR1
4005     elif instance.disk_template == constants.DT_DRBD8:
4006       if self.op.remote_node is None:
4007         fn = self._ExecD8DiskOnly
4008       else:
4009         fn = self._ExecD8Secondary
4010     else:
4011       raise errors.ProgrammerError("Unhandled disk replacement case")
4012     return fn(feedback_fn)
4013
4014
4015 class LUQueryInstanceData(NoHooksLU):
4016   """Query runtime instance data.
4017
4018   """
4019   _OP_REQP = ["instances"]
4020
4021   def CheckPrereq(self):
4022     """Check prerequisites.
4023
4024     This only checks the optional instance list against the existing names.
4025
4026     """
4027     if not isinstance(self.op.instances, list):
4028       raise errors.OpPrereqError("Invalid argument type 'instances'")
4029     if self.op.instances:
4030       self.wanted_instances = []
4031       names = self.op.instances
4032       for name in names:
4033         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4034         if instance is None:
4035           raise errors.OpPrereqError("No such instance name '%s'" % name)
4036         self.wanted_instances.append(instance)
4037     else:
4038       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4039                                in self.cfg.GetInstanceList()]
4040     return
4041
4042
4043   def _ComputeDiskStatus(self, instance, snode, dev):
4044     """Compute block device status.
4045
4046     """
4047     self.cfg.SetDiskID(dev, instance.primary_node)
4048     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4049     if dev.dev_type in constants.LDS_DRBD:
4050       # we change the snode then (otherwise we use the one passed in)
4051       if dev.logical_id[0] == instance.primary_node:
4052         snode = dev.logical_id[1]
4053       else:
4054         snode = dev.logical_id[0]
4055
4056     if snode:
4057       self.cfg.SetDiskID(dev, snode)
4058       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4059     else:
4060       dev_sstatus = None
4061
4062     if dev.children:
4063       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4064                       for child in dev.children]
4065     else:
4066       dev_children = []
4067
4068     data = {
4069       "iv_name": dev.iv_name,
4070       "dev_type": dev.dev_type,
4071       "logical_id": dev.logical_id,
4072       "physical_id": dev.physical_id,
4073       "pstatus": dev_pstatus,
4074       "sstatus": dev_sstatus,
4075       "children": dev_children,
4076       }
4077
4078     return data
4079
4080   def Exec(self, feedback_fn):
4081     """Gather and return data"""
4082     result = {}
4083     for instance in self.wanted_instances:
4084       remote_info = rpc.call_instance_info(instance.primary_node,
4085                                                 instance.name)
4086       if remote_info and "state" in remote_info:
4087         remote_state = "up"
4088       else:
4089         remote_state = "down"
4090       if instance.status == "down":
4091         config_state = "down"
4092       else:
4093         config_state = "up"
4094
4095       disks = [self._ComputeDiskStatus(instance, None, device)
4096                for device in instance.disks]
4097
4098       idict = {
4099         "name": instance.name,
4100         "config_state": config_state,
4101         "run_state": remote_state,
4102         "pnode": instance.primary_node,
4103         "snodes": instance.secondary_nodes,
4104         "os": instance.os,
4105         "memory": instance.memory,
4106         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4107         "disks": disks,
4108         "network_port": instance.network_port,
4109         "vcpus": instance.vcpus,
4110         "kernel_path": instance.kernel_path,
4111         "initrd_path": instance.initrd_path,
4112         "hvm_boot_order": instance.hvm_boot_order,
4113         }
4114
4115       result[instance.name] = idict
4116
4117     return result
4118
4119
4120 class LUSetInstanceParms(LogicalUnit):
4121   """Modifies an instances's parameters.
4122
4123   """
4124   HPATH = "instance-modify"
4125   HTYPE = constants.HTYPE_INSTANCE
4126   _OP_REQP = ["instance_name"]
4127
4128   def BuildHooksEnv(self):
4129     """Build hooks env.
4130
4131     This runs on the master, primary and secondaries.
4132
4133     """
4134     args = dict()
4135     if self.mem:
4136       args['memory'] = self.mem
4137     if self.vcpus:
4138       args['vcpus'] = self.vcpus
4139     if self.do_ip or self.do_bridge or self.mac:
4140       if self.do_ip:
4141         ip = self.ip
4142       else:
4143         ip = self.instance.nics[0].ip
4144       if self.bridge:
4145         bridge = self.bridge
4146       else:
4147         bridge = self.instance.nics[0].bridge
4148       if self.mac:
4149         mac = self.mac
4150       else:
4151         mac = self.instance.nics[0].mac
4152       args['nics'] = [(ip, bridge, mac)]
4153     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4154     nl = [self.sstore.GetMasterNode(),
4155           self.instance.primary_node] + list(self.instance.secondary_nodes)
4156     return env, nl, nl
4157
4158   def CheckPrereq(self):
4159     """Check prerequisites.
4160
4161     This only checks the instance list against the existing names.
4162
4163     """
4164     self.mem = getattr(self.op, "mem", None)
4165     self.vcpus = getattr(self.op, "vcpus", None)
4166     self.ip = getattr(self.op, "ip", None)
4167     self.mac = getattr(self.op, "mac", None)
4168     self.bridge = getattr(self.op, "bridge", None)
4169     self.kernel_path = getattr(self.op, "kernel_path", None)
4170     self.initrd_path = getattr(self.op, "initrd_path", None)
4171     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4172     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4173                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4174     if all_parms.count(None) == len(all_parms):
4175       raise errors.OpPrereqError("No changes submitted")
4176     if self.mem is not None:
4177       try:
4178         self.mem = int(self.mem)
4179       except ValueError, err:
4180         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4181     if self.vcpus is not None:
4182       try:
4183         self.vcpus = int(self.vcpus)
4184       except ValueError, err:
4185         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4186     if self.ip is not None:
4187       self.do_ip = True
4188       if self.ip.lower() == "none":
4189         self.ip = None
4190       else:
4191         if not utils.IsValidIP(self.ip):
4192           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4193     else:
4194       self.do_ip = False
4195     self.do_bridge = (self.bridge is not None)
4196     if self.mac is not None:
4197       if self.cfg.IsMacInUse(self.mac):
4198         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4199                                    self.mac)
4200       if not utils.IsValidMac(self.mac):
4201         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4202
4203     if self.kernel_path is not None:
4204       self.do_kernel_path = True
4205       if self.kernel_path == constants.VALUE_NONE:
4206         raise errors.OpPrereqError("Can't set instance to no kernel")
4207
4208       if self.kernel_path != constants.VALUE_DEFAULT:
4209         if not os.path.isabs(self.kernel_path):
4210           raise errors.OpPrereqError("The kernel path must be an absolute"
4211                                     " filename")
4212     else:
4213       self.do_kernel_path = False
4214
4215     if self.initrd_path is not None:
4216       self.do_initrd_path = True
4217       if self.initrd_path not in (constants.VALUE_NONE,
4218                                   constants.VALUE_DEFAULT):
4219         if not os.path.isabs(self.initrd_path):
4220           raise errors.OpPrereqError("The initrd path must be an absolute"
4221                                     " filename")
4222     else:
4223       self.do_initrd_path = False
4224
4225     # boot order verification
4226     if self.hvm_boot_order is not None:
4227       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4228         if len(self.hvm_boot_order.strip("acdn")) != 0:
4229           raise errors.OpPrereqError("invalid boot order specified,"
4230                                      " must be one or more of [acdn]"
4231                                      " or 'default'")
4232
4233     instance = self.cfg.GetInstanceInfo(
4234       self.cfg.ExpandInstanceName(self.op.instance_name))
4235     if instance is None:
4236       raise errors.OpPrereqError("No such instance name '%s'" %
4237                                  self.op.instance_name)
4238     self.op.instance_name = instance.name
4239     self.instance = instance
4240     return
4241
4242   def Exec(self, feedback_fn):
4243     """Modifies an instance.
4244
4245     All parameters take effect only at the next restart of the instance.
4246     """
4247     result = []
4248     instance = self.instance
4249     if self.mem:
4250       instance.memory = self.mem
4251       result.append(("mem", self.mem))
4252     if self.vcpus:
4253       instance.vcpus = self.vcpus
4254       result.append(("vcpus",  self.vcpus))
4255     if self.do_ip:
4256       instance.nics[0].ip = self.ip
4257       result.append(("ip", self.ip))
4258     if self.bridge:
4259       instance.nics[0].bridge = self.bridge
4260       result.append(("bridge", self.bridge))
4261     if self.mac:
4262       instance.nics[0].mac = self.mac
4263       result.append(("mac", self.mac))
4264     if self.do_kernel_path:
4265       instance.kernel_path = self.kernel_path
4266       result.append(("kernel_path", self.kernel_path))
4267     if self.do_initrd_path:
4268       instance.initrd_path = self.initrd_path
4269       result.append(("initrd_path", self.initrd_path))
4270     if self.hvm_boot_order:
4271       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4272         instance.hvm_boot_order = None
4273       else:
4274         instance.hvm_boot_order = self.hvm_boot_order
4275       result.append(("hvm_boot_order", self.hvm_boot_order))
4276
4277     self.cfg.AddInstance(instance)
4278
4279     return result
4280
4281
4282 class LUQueryExports(NoHooksLU):
4283   """Query the exports list
4284
4285   """
4286   _OP_REQP = []
4287
4288   def CheckPrereq(self):
4289     """Check that the nodelist contains only existing nodes.
4290
4291     """
4292     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4293
4294   def Exec(self, feedback_fn):
4295     """Compute the list of all the exported system images.
4296
4297     Returns:
4298       a dictionary with the structure node->(export-list)
4299       where export-list is a list of the instances exported on
4300       that node.
4301
4302     """
4303     return rpc.call_export_list(self.nodes)
4304
4305
4306 class LUExportInstance(LogicalUnit):
4307   """Export an instance to an image in the cluster.
4308
4309   """
4310   HPATH = "instance-export"
4311   HTYPE = constants.HTYPE_INSTANCE
4312   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4313
4314   def BuildHooksEnv(self):
4315     """Build hooks env.
4316
4317     This will run on the master, primary node and target node.
4318
4319     """
4320     env = {
4321       "EXPORT_NODE": self.op.target_node,
4322       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4323       }
4324     env.update(_BuildInstanceHookEnvByObject(self.instance))
4325     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4326           self.op.target_node]
4327     return env, nl, nl
4328
4329   def CheckPrereq(self):
4330     """Check prerequisites.
4331
4332     This checks that the instance name is a valid one.
4333
4334     """
4335     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4336     self.instance = self.cfg.GetInstanceInfo(instance_name)
4337     if self.instance is None:
4338       raise errors.OpPrereqError("Instance '%s' not found" %
4339                                  self.op.instance_name)
4340
4341     # node verification
4342     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4343     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4344
4345     if self.dst_node is None:
4346       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4347                                  self.op.target_node)
4348     self.op.target_node = self.dst_node.name
4349
4350   def Exec(self, feedback_fn):
4351     """Export an instance to an image in the cluster.
4352
4353     """
4354     instance = self.instance
4355     dst_node = self.dst_node
4356     src_node = instance.primary_node
4357     # shutdown the instance, unless requested not to do so
4358     if self.op.shutdown:
4359       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4360       self.proc.ChainOpCode(op)
4361
4362     vgname = self.cfg.GetVGName()
4363
4364     snap_disks = []
4365
4366     try:
4367       for disk in instance.disks:
4368         if disk.iv_name == "sda":
4369           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4370           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4371
4372           if not new_dev_name:
4373             logger.Error("could not snapshot block device %s on node %s" %
4374                          (disk.logical_id[1], src_node))
4375           else:
4376             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4377                                       logical_id=(vgname, new_dev_name),
4378                                       physical_id=(vgname, new_dev_name),
4379                                       iv_name=disk.iv_name)
4380             snap_disks.append(new_dev)
4381
4382     finally:
4383       if self.op.shutdown:
4384         op = opcodes.OpStartupInstance(instance_name=instance.name,
4385                                        force=False)
4386         self.proc.ChainOpCode(op)
4387
4388     # TODO: check for size
4389
4390     for dev in snap_disks:
4391       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4392                                            instance):
4393         logger.Error("could not export block device %s from node"
4394                      " %s to node %s" %
4395                      (dev.logical_id[1], src_node, dst_node.name))
4396       if not rpc.call_blockdev_remove(src_node, dev):
4397         logger.Error("could not remove snapshot block device %s from"
4398                      " node %s" % (dev.logical_id[1], src_node))
4399
4400     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4401       logger.Error("could not finalize export for instance %s on node %s" %
4402                    (instance.name, dst_node.name))
4403
4404     nodelist = self.cfg.GetNodeList()
4405     nodelist.remove(dst_node.name)
4406
4407     # on one-node clusters nodelist will be empty after the removal
4408     # if we proceed the backup would be removed because OpQueryExports
4409     # substitutes an empty list with the full cluster node list.
4410     if nodelist:
4411       op = opcodes.OpQueryExports(nodes=nodelist)
4412       exportlist = self.proc.ChainOpCode(op)
4413       for node in exportlist:
4414         if instance.name in exportlist[node]:
4415           if not rpc.call_export_remove(node, instance.name):
4416             logger.Error("could not remove older export for instance %s"
4417                          " on node %s" % (instance.name, node))
4418
4419
4420 class TagsLU(NoHooksLU):
4421   """Generic tags LU.
4422
4423   This is an abstract class which is the parent of all the other tags LUs.
4424
4425   """
4426   def CheckPrereq(self):
4427     """Check prerequisites.
4428
4429     """
4430     if self.op.kind == constants.TAG_CLUSTER:
4431       self.target = self.cfg.GetClusterInfo()
4432     elif self.op.kind == constants.TAG_NODE:
4433       name = self.cfg.ExpandNodeName(self.op.name)
4434       if name is None:
4435         raise errors.OpPrereqError("Invalid node name (%s)" %
4436                                    (self.op.name,))
4437       self.op.name = name
4438       self.target = self.cfg.GetNodeInfo(name)
4439     elif self.op.kind == constants.TAG_INSTANCE:
4440       name = self.cfg.ExpandInstanceName(self.op.name)
4441       if name is None:
4442         raise errors.OpPrereqError("Invalid instance name (%s)" %
4443                                    (self.op.name,))
4444       self.op.name = name
4445       self.target = self.cfg.GetInstanceInfo(name)
4446     else:
4447       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4448                                  str(self.op.kind))
4449
4450
4451 class LUGetTags(TagsLU):
4452   """Returns the tags of a given object.
4453
4454   """
4455   _OP_REQP = ["kind", "name"]
4456
4457   def Exec(self, feedback_fn):
4458     """Returns the tag list.
4459
4460     """
4461     return self.target.GetTags()
4462
4463
4464 class LUSearchTags(NoHooksLU):
4465   """Searches the tags for a given pattern.
4466
4467   """
4468   _OP_REQP = ["pattern"]
4469
4470   def CheckPrereq(self):
4471     """Check prerequisites.
4472
4473     This checks the pattern passed for validity by compiling it.
4474
4475     """
4476     try:
4477       self.re = re.compile(self.op.pattern)
4478     except re.error, err:
4479       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4480                                  (self.op.pattern, err))
4481
4482   def Exec(self, feedback_fn):
4483     """Returns the tag list.
4484
4485     """
4486     cfg = self.cfg
4487     tgts = [("/cluster", cfg.GetClusterInfo())]
4488     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4489     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4490     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4491     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4492     results = []
4493     for path, target in tgts:
4494       for tag in target.GetTags():
4495         if self.re.search(tag):
4496           results.append((path, tag))
4497     return results
4498
4499
4500 class LUAddTags(TagsLU):
4501   """Sets a tag on a given object.
4502
4503   """
4504   _OP_REQP = ["kind", "name", "tags"]
4505
4506   def CheckPrereq(self):
4507     """Check prerequisites.
4508
4509     This checks the type and length of the tag name and value.
4510
4511     """
4512     TagsLU.CheckPrereq(self)
4513     for tag in self.op.tags:
4514       objects.TaggableObject.ValidateTag(tag)
4515
4516   def Exec(self, feedback_fn):
4517     """Sets the tag.
4518
4519     """
4520     try:
4521       for tag in self.op.tags:
4522         self.target.AddTag(tag)
4523     except errors.TagError, err:
4524       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4525     try:
4526       self.cfg.Update(self.target)
4527     except errors.ConfigurationError:
4528       raise errors.OpRetryError("There has been a modification to the"
4529                                 " config file and the operation has been"
4530                                 " aborted. Please retry.")
4531
4532
4533 class LUDelTags(TagsLU):
4534   """Delete a list of tags from a given object.
4535
4536   """
4537   _OP_REQP = ["kind", "name", "tags"]
4538
4539   def CheckPrereq(self):
4540     """Check prerequisites.
4541
4542     This checks that we have the given tag.
4543
4544     """
4545     TagsLU.CheckPrereq(self)
4546     for tag in self.op.tags:
4547       objects.TaggableObject.ValidateTag(tag)
4548     del_tags = frozenset(self.op.tags)
4549     cur_tags = self.target.GetTags()
4550     if not del_tags <= cur_tags:
4551       diff_tags = del_tags - cur_tags
4552       diff_names = ["'%s'" % tag for tag in diff_tags]
4553       diff_names.sort()
4554       raise errors.OpPrereqError("Tag(s) %s not found" %
4555                                  (",".join(diff_names)))
4556
4557   def Exec(self, feedback_fn):
4558     """Remove the tag from the object.
4559
4560     """
4561     for tag in self.op.tags:
4562       self.target.RemoveTag(tag)
4563     try:
4564       self.cfg.Update(self.target)
4565     except errors.ConfigurationError:
4566       raise errors.OpRetryError("There has been a modification to the"
4567                                 " config file and the operation has been"
4568                                 " aborted. Please retry.")
4569
4570 class LUTestDelay(NoHooksLU):
4571   """Sleep for a specified amount of time.
4572
4573   This LU sleeps on the master and/or nodes for a specified amoutn of
4574   time.
4575
4576   """
4577   _OP_REQP = ["duration", "on_master", "on_nodes"]
4578
4579   def CheckPrereq(self):
4580     """Check prerequisites.
4581
4582     This checks that we have a good list of nodes and/or the duration
4583     is valid.
4584
4585     """
4586
4587     if self.op.on_nodes:
4588       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4589
4590   def Exec(self, feedback_fn):
4591     """Do the actual sleep.
4592
4593     """
4594     if self.op.on_master:
4595       if not utils.TestDelay(self.op.duration):
4596         raise errors.OpExecError("Error during master delay test")
4597     if self.op.on_nodes:
4598       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4599       if not result:
4600         raise errors.OpExecError("Complete failure from rpc call")
4601       for node, node_result in result.items():
4602         if not node_result:
4603           raise errors.OpExecError("Failure during rpc call to node %s,"
4604                                    " result: %s" % (node, node_result))