Style fixes for the 1.2 branch
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement CheckPrereq which also fills in the opcode instance
53       with all the fields (even if as None)
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements (REQ_CLUSTER,
58       REQ_MASTER); note that all commands require root permissions
59
60   """
61   HPATH = None
62   HTYPE = None
63   _OP_REQP = []
64   REQ_CLUSTER = True
65   REQ_MASTER = True
66
67   def __init__(self, processor, op, cfg, sstore):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = cfg
77     self.sstore = sstore
78     for attr_name in self._OP_REQP:
79       attr_val = getattr(op, attr_name, None)
80       if attr_val is None:
81         raise errors.OpPrereqError("Required parameter '%s' missing" %
82                                    attr_name)
83     if self.REQ_CLUSTER:
84       if not cfg.IsCluster():
85         raise errors.OpPrereqError("Cluster not initialized yet,"
86                                    " use 'gnt-cluster init' first.")
87       if self.REQ_MASTER:
88         master = sstore.GetMasterNode()
89         if master != utils.HostInfo().name:
90           raise errors.OpPrereqError("Commands must be run on the master"
91                                      " node %s" % master)
92
93   def CheckPrereq(self):
94     """Check prerequisites for this LU.
95
96     This method should check that the prerequisites for the execution
97     of this LU are fulfilled. It can do internode communication, but
98     it should be idempotent - no cluster or system changes are
99     allowed.
100
101     The method should raise errors.OpPrereqError in case something is
102     not fulfilled. Its return value is ignored.
103
104     This method should also update all the parameters of the opcode to
105     their canonical form; e.g. a short node name must be fully
106     expanded after this method has successfully completed (so that
107     hooks, logging, etc. work correctly).
108
109     """
110     raise NotImplementedError
111
112   def Exec(self, feedback_fn):
113     """Execute the LU.
114
115     This method should implement the actual work. It should raise
116     errors.OpExecError for failures that are somewhat dealt with in
117     code, or expected.
118
119     """
120     raise NotImplementedError
121
122   def BuildHooksEnv(self):
123     """Build hooks environment for this LU.
124
125     This method should return a three-node tuple consisting of: a dict
126     containing the environment that will be used for running the
127     specific hook for this LU, a list of node names on which the hook
128     should run before the execution, and a list of node names on which
129     the hook should run after the execution.
130
131     The keys of the dict must not have 'GANETI_' prefixed as this will
132     be handled in the hooks runner. Also note additional keys will be
133     added by the hooks runner. If the LU doesn't define any
134     environment, an empty dict (and not None) should be returned.
135
136     As for the node lists, the master should not be included in the
137     them, as it will be added by the hooks runner in case this LU
138     requires a cluster to run on (otherwise we don't have a node
139     list). No nodes should be returned as an empty list (and not
140     None).
141
142     Note that if the HPATH for a LU class is None, this function will
143     not be called.
144
145     """
146     raise NotImplementedError
147
148
149 class NoHooksLU(LogicalUnit):
150   """Simple LU which runs no hooks.
151
152   This LU is intended as a parent for other LogicalUnits which will
153   run no hooks, in order to reduce duplicate code.
154
155   """
156   HPATH = None
157   HTYPE = None
158
159   def BuildHooksEnv(self):
160     """Build hooks env.
161
162     This is a no-op, since we don't run hooks.
163
164     """
165     return {}, [], []
166
167
168 def _AddHostToEtcHosts(hostname):
169   """Wrapper around utils.SetEtcHostsEntry.
170
171   """
172   hi = utils.HostInfo(name=hostname)
173   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
174
175
176 def _RemoveHostFromEtcHosts(hostname):
177   """Wrapper around utils.RemoveEtcHostsEntry.
178
179   """
180   hi = utils.HostInfo(name=hostname)
181   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
182   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
183
184
185 def _GetWantedNodes(lu, nodes):
186   """Returns list of checked and expanded node names.
187
188   Args:
189     nodes: List of nodes (strings) or None for all
190
191   """
192   if not isinstance(nodes, list):
193     raise errors.OpPrereqError("Invalid argument type 'nodes'")
194
195   if nodes:
196     wanted = []
197
198     for name in nodes:
199       node = lu.cfg.ExpandNodeName(name)
200       if node is None:
201         raise errors.OpPrereqError("No such node name '%s'" % name)
202       wanted.append(node)
203
204   else:
205     wanted = lu.cfg.GetNodeList()
206   return utils.NiceSort(wanted)
207
208
209 def _GetWantedInstances(lu, instances):
210   """Returns list of checked and expanded instance names.
211
212   Args:
213     instances: List of instances (strings) or None for all
214
215   """
216   if not isinstance(instances, list):
217     raise errors.OpPrereqError("Invalid argument type 'instances'")
218
219   if instances:
220     wanted = []
221
222     for name in instances:
223       instance = lu.cfg.ExpandInstanceName(name)
224       if instance is None:
225         raise errors.OpPrereqError("No such instance name '%s'" % name)
226       wanted.append(instance)
227
228   else:
229     wanted = lu.cfg.GetInstanceList()
230   return utils.NiceSort(wanted)
231
232
233 def _CheckOutputFields(static, dynamic, selected):
234   """Checks whether all selected fields are valid.
235
236   Args:
237     static: Static fields
238     dynamic: Dynamic fields
239
240   """
241   static_fields = frozenset(static)
242   dynamic_fields = frozenset(dynamic)
243
244   all_fields = static_fields | dynamic_fields
245
246   if not all_fields.issuperset(selected):
247     raise errors.OpPrereqError("Unknown output fields selected: %s"
248                                % ",".join(frozenset(selected).
249                                           difference(all_fields)))
250
251
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253                           memory, vcpus, nics):
254   """Builds instance related env variables for hooks from single variables.
255
256   Args:
257     secondary_nodes: List of secondary nodes as strings
258   """
259   env = {
260     "OP_TARGET": name,
261     "INSTANCE_NAME": name,
262     "INSTANCE_PRIMARY": primary_node,
263     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264     "INSTANCE_OS_TYPE": os_type,
265     "INSTANCE_STATUS": status,
266     "INSTANCE_MEMORY": memory,
267     "INSTANCE_VCPUS": vcpus,
268   }
269
270   if nics:
271     nic_count = len(nics)
272     for idx, (ip, bridge, mac) in enumerate(nics):
273       if ip is None:
274         ip = ""
275       env["INSTANCE_NIC%d_IP" % idx] = ip
276       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
278   else:
279     nic_count = 0
280
281   env["INSTANCE_NIC_COUNT"] = nic_count
282
283   return env
284
285
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287   """Builds instance related env variables for hooks from an object.
288
289   Args:
290     instance: objects.Instance object of instance
291     override: dict of values to override
292   """
293   args = {
294     'name': instance.name,
295     'primary_node': instance.primary_node,
296     'secondary_nodes': instance.secondary_nodes,
297     'os_type': instance.os,
298     'status': instance.os,
299     'memory': instance.memory,
300     'vcpus': instance.vcpus,
301     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
302   }
303   if override:
304     args.update(override)
305   return _BuildInstanceHookEnv(**args)
306
307
308 def _UpdateKnownHosts(fullnode, ip, pubkey):
309   """Ensure a node has a correct known_hosts entry.
310
311   Args:
312     fullnode - Fully qualified domain name of host. (str)
313     ip       - IPv4 address of host (str)
314     pubkey   - the public key of the cluster
315
316   """
317   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
318     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
319   else:
320     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
321
322   inthere = False
323
324   save_lines = []
325   add_lines = []
326   removed = False
327
328   for rawline in f:
329     logger.Debug('read %s' % (repr(rawline),))
330
331     parts = rawline.rstrip('\r\n').split()
332
333     # Ignore unwanted lines
334     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
335       fields = parts[0].split(',')
336       key = parts[2]
337
338       haveall = True
339       havesome = False
340       for spec in [ ip, fullnode ]:
341         if spec not in fields:
342           haveall = False
343         if spec in fields:
344           havesome = True
345
346       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
347       if haveall and key == pubkey:
348         inthere = True
349         save_lines.append(rawline)
350         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
351         continue
352
353       if havesome and (not haveall or key != pubkey):
354         removed = True
355         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
356         continue
357
358     save_lines.append(rawline)
359
360   if not inthere:
361     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
362     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
363
364   if removed:
365     save_lines = save_lines + add_lines
366
367     # Write a new file and replace old.
368     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
369                                    constants.DATA_DIR)
370     newfile = os.fdopen(fd, 'w')
371     try:
372       newfile.write(''.join(save_lines))
373     finally:
374       newfile.close()
375     logger.Debug("Wrote new known_hosts.")
376     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
377
378   elif add_lines:
379     # Simply appending a new line will do the trick.
380     f.seek(0, 2)
381     for add in add_lines:
382       f.write(add)
383
384   f.close()
385
386
387 def _HasValidVG(vglist, vgname):
388   """Checks if the volume group list is valid.
389
390   A non-None return value means there's an error, and the return value
391   is the error message.
392
393   """
394   vgsize = vglist.get(vgname, None)
395   if vgsize is None:
396     return "volume group '%s' missing" % vgname
397   elif vgsize < 20480:
398     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
399             (vgname, vgsize))
400   return None
401
402
403 def _InitSSHSetup(node):
404   """Setup the SSH configuration for the cluster.
405
406
407   This generates a dsa keypair for root, adds the pub key to the
408   permitted hosts and adds the hostkey to its own known hosts.
409
410   Args:
411     node: the name of this host as a fqdn
412
413   """
414   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
415
416   for name in priv_key, pub_key:
417     if os.path.exists(name):
418       utils.CreateBackup(name)
419     utils.RemoveFile(name)
420
421   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
422                          "-f", priv_key,
423                          "-q", "-N", ""])
424   if result.failed:
425     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
426                              result.output)
427
428   f = open(pub_key, 'r')
429   try:
430     utils.AddAuthorizedKey(auth_keys, f.read(8192))
431   finally:
432     f.close()
433
434
435 def _InitGanetiServerSetup(ss):
436   """Setup the necessary configuration for the initial node daemon.
437
438   This creates the nodepass file containing the shared password for
439   the cluster and also generates the SSL certificate.
440
441   """
442   # Create pseudo random password
443   randpass = sha.new(os.urandom(64)).hexdigest()
444   # and write it into sstore
445   ss.SetKey(ss.SS_NODED_PASS, randpass)
446
447   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
448                          "-days", str(365*5), "-nodes", "-x509",
449                          "-keyout", constants.SSL_CERT_FILE,
450                          "-out", constants.SSL_CERT_FILE, "-batch"])
451   if result.failed:
452     raise errors.OpExecError("could not generate server ssl cert, command"
453                              " %s had exitcode %s and error message %s" %
454                              (result.cmd, result.exit_code, result.output))
455
456   os.chmod(constants.SSL_CERT_FILE, 0400)
457
458   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
459
460   if result.failed:
461     raise errors.OpExecError("Could not start the node daemon, command %s"
462                              " had exitcode %s and error %s" %
463                              (result.cmd, result.exit_code, result.output))
464
465
466 def _CheckInstanceBridgesExist(instance):
467   """Check that the brigdes needed by an instance exist.
468
469   """
470   # check bridges existance
471   brlist = [nic.bridge for nic in instance.nics]
472   if not rpc.call_bridges_exist(instance.primary_node, brlist):
473     raise errors.OpPrereqError("one or more target bridges %s does not"
474                                " exist on destination node '%s'" %
475                                (brlist, instance.primary_node))
476
477
478 class LUInitCluster(LogicalUnit):
479   """Initialise the cluster.
480
481   """
482   HPATH = "cluster-init"
483   HTYPE = constants.HTYPE_CLUSTER
484   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
485               "def_bridge", "master_netdev"]
486   REQ_CLUSTER = False
487
488   def BuildHooksEnv(self):
489     """Build hooks env.
490
491     Notes: Since we don't require a cluster, we must manually add
492     ourselves in the post-run node list.
493
494     """
495     env = {"OP_TARGET": self.op.cluster_name}
496     return env, [], [self.hostname.name]
497
498   def CheckPrereq(self):
499     """Verify that the passed name is a valid one.
500
501     """
502     if config.ConfigWriter.IsCluster():
503       raise errors.OpPrereqError("Cluster is already initialised")
504
505     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
506       if not os.path.exists(constants.VNC_PASSWORD_FILE):
507         raise errors.OpPrereqError("Please prepare the cluster VNC"
508                                    "password file %s" %
509                                    constants.VNC_PASSWORD_FILE)
510
511     self.hostname = hostname = utils.HostInfo()
512
513     if hostname.ip.startswith("127."):
514       raise errors.OpPrereqError("This host's IP resolves to the private"
515                                  " range (%s). Please fix DNS or %s." %
516                                  (hostname.ip, constants.ETC_HOSTS))
517
518     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
519                          source=constants.LOCALHOST_IP_ADDRESS):
520       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
521                                  " to %s,\nbut this ip address does not"
522                                  " belong to this host."
523                                  " Aborting." % hostname.ip)
524
525     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
526
527     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
528                      timeout=5):
529       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
530
531     secondary_ip = getattr(self.op, "secondary_ip", None)
532     if secondary_ip and not utils.IsValidIP(secondary_ip):
533       raise errors.OpPrereqError("Invalid secondary ip given")
534     if (secondary_ip and
535         secondary_ip != hostname.ip and
536         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
537                            source=constants.LOCALHOST_IP_ADDRESS))):
538       raise errors.OpPrereqError("You gave %s as secondary IP,"
539                                  " but it does not belong to this host." %
540                                  secondary_ip)
541     self.secondary_ip = secondary_ip
542
543     # checks presence of the volume group given
544     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
545
546     if vgstatus:
547       raise errors.OpPrereqError("Error: %s" % vgstatus)
548
549     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
550                     self.op.mac_prefix):
551       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
552                                  self.op.mac_prefix)
553
554     if self.op.hypervisor_type not in constants.HYPER_TYPES:
555       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
556                                  self.op.hypervisor_type)
557
558     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
559     if result.failed:
560       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
561                                  (self.op.master_netdev,
562                                   result.output.strip()))
563
564     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
565             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
566       raise errors.OpPrereqError("Init.d script '%s' missing or not"
567                                  " executable." % constants.NODE_INITD_SCRIPT)
568
569   def Exec(self, feedback_fn):
570     """Initialize the cluster.
571
572     """
573     clustername = self.clustername
574     hostname = self.hostname
575
576     # set up the simple store
577     self.sstore = ss = ssconf.SimpleStore()
578     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
579     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
580     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
581     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
582     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
583
584     # set up the inter-node password and certificate
585     _InitGanetiServerSetup(ss)
586
587     # start the master ip
588     rpc.call_node_start_master(hostname.name)
589
590     # set up ssh config and /etc/hosts
591     f = open(constants.SSH_HOST_RSA_PUB, 'r')
592     try:
593       sshline = f.read()
594     finally:
595       f.close()
596     sshkey = sshline.split(" ")[1]
597
598     _AddHostToEtcHosts(hostname.name)
599
600     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
601
602     _InitSSHSetup(hostname.name)
603
604     # init of cluster config file
605     self.cfg = cfgw = config.ConfigWriter()
606     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
607                     sshkey, self.op.mac_prefix,
608                     self.op.vg_name, self.op.def_bridge)
609
610
611 class LUDestroyCluster(NoHooksLU):
612   """Logical unit for destroying the cluster.
613
614   """
615   _OP_REQP = []
616
617   def CheckPrereq(self):
618     """Check prerequisites.
619
620     This checks whether the cluster is empty.
621
622     Any errors are signalled by raising errors.OpPrereqError.
623
624     """
625     master = self.sstore.GetMasterNode()
626
627     nodelist = self.cfg.GetNodeList()
628     if len(nodelist) != 1 or nodelist[0] != master:
629       raise errors.OpPrereqError("There are still %d node(s) in"
630                                  " this cluster." % (len(nodelist) - 1))
631     instancelist = self.cfg.GetInstanceList()
632     if instancelist:
633       raise errors.OpPrereqError("There are still %d instance(s) in"
634                                  " this cluster." % len(instancelist))
635
636   def Exec(self, feedback_fn):
637     """Destroys the cluster.
638
639     """
640     master = self.sstore.GetMasterNode()
641     if not rpc.call_node_stop_master(master):
642       raise errors.OpExecError("Could not disable the master role")
643     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
644     utils.CreateBackup(priv_key)
645     utils.CreateBackup(pub_key)
646     rpc.call_node_leave_cluster(master)
647
648
649 class LUVerifyCluster(NoHooksLU):
650   """Verifies the cluster status.
651
652   """
653   _OP_REQP = ["skip_checks"]
654
655   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
656                   remote_version, feedback_fn):
657     """Run multiple tests against a node.
658
659     Test list:
660       - compares ganeti version
661       - checks vg existance and size > 20G
662       - checks config file checksum
663       - checks ssh to other nodes
664
665     Args:
666       node: name of the node to check
667       file_list: required list of files
668       local_cksum: dictionary of local files and their checksums
669
670     """
671     # compares ganeti version
672     local_version = constants.PROTOCOL_VERSION
673     if not remote_version:
674       feedback_fn("  - ERROR: connection to %s failed" % (node))
675       return True
676
677     if local_version != remote_version:
678       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
679                       (local_version, node, remote_version))
680       return True
681
682     # checks vg existance and size > 20G
683
684     bad = False
685     if not vglist:
686       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
687                       (node,))
688       bad = True
689     else:
690       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
691       if vgstatus:
692         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
693         bad = True
694
695     # checks config file checksum
696     # checks ssh to any
697
698     if 'filelist' not in node_result:
699       bad = True
700       feedback_fn("  - ERROR: node hasn't returned file checksum data")
701     else:
702       remote_cksum = node_result['filelist']
703       for file_name in file_list:
704         if file_name not in remote_cksum:
705           bad = True
706           feedback_fn("  - ERROR: file '%s' missing" % file_name)
707         elif remote_cksum[file_name] != local_cksum[file_name]:
708           bad = True
709           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
710
711     if 'nodelist' not in node_result:
712       bad = True
713       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
714     else:
715       if node_result['nodelist']:
716         bad = True
717         for node in node_result['nodelist']:
718           feedback_fn("  - ERROR: communication with node '%s': %s" %
719                           (node, node_result['nodelist'][node]))
720     hyp_result = node_result.get('hypervisor', None)
721     if hyp_result is not None:
722       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
723     return bad
724
725   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
726                       node_instance, feedback_fn):
727     """Verify an instance.
728
729     This function checks to see if the required block devices are
730     available on the instance's node.
731
732     """
733     bad = False
734
735     node_current = instanceconfig.primary_node
736
737     node_vol_should = {}
738     instanceconfig.MapLVsByNode(node_vol_should)
739
740     for node in node_vol_should:
741       for volume in node_vol_should[node]:
742         if node not in node_vol_is or volume not in node_vol_is[node]:
743           feedback_fn("  - ERROR: volume %s missing on node %s" %
744                           (volume, node))
745           bad = True
746
747     if not instanceconfig.status == 'down':
748       if (node_current not in node_instance or
749           not instance in node_instance[node_current]):
750         feedback_fn("  - ERROR: instance %s not running on node %s" %
751                         (instance, node_current))
752         bad = True
753
754     for node in node_instance:
755       if (not node == node_current):
756         if instance in node_instance[node]:
757           feedback_fn("  - ERROR: instance %s should not run on node %s" %
758                           (instance, node))
759           bad = True
760
761     return bad
762
763   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
764     """Verify if there are any unknown volumes in the cluster.
765
766     The .os, .swap and backup volumes are ignored. All other volumes are
767     reported as unknown.
768
769     """
770     bad = False
771
772     for node in node_vol_is:
773       for volume in node_vol_is[node]:
774         if node not in node_vol_should or volume not in node_vol_should[node]:
775           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
776                       (volume, node))
777           bad = True
778     return bad
779
780   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
781     """Verify the list of running instances.
782
783     This checks what instances are running but unknown to the cluster.
784
785     """
786     bad = False
787     for node in node_instance:
788       for runninginstance in node_instance[node]:
789         if runninginstance not in instancelist:
790           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
791                           (runninginstance, node))
792           bad = True
793     return bad
794
795   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
796     """Verify N+1 Memory Resilience.
797
798     Check that if one single node dies we can still start all the instances it
799     was primary for.
800
801     """
802     bad = False
803
804     for node, nodeinfo in node_info.iteritems():
805       # This code checks that every node which is now listed as secondary has
806       # enough memory to host all instances it is supposed to should a single
807       # other node in the cluster fail.
808       # FIXME: not ready for failover to an arbitrary node
809       # FIXME: does not support file-backed instances
810       # WARNING: we currently take into account down instances as well as up
811       # ones, considering that even if they're down someone might want to start
812       # them even in the event of a node failure.
813       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
814         needed_mem = 0
815         for instance in instances:
816           needed_mem += instance_cfg[instance].memory
817         if nodeinfo['mfree'] < needed_mem:
818           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
819                       " failovers should node %s fail" % (node, prinode))
820           bad = True
821     return bad
822
823   def CheckPrereq(self):
824     """Check prerequisites.
825
826     Transform the list of checks we're going to skip into a set and check that
827     all its members are valid.
828
829     """
830     self.skip_set = frozenset(self.op.skip_checks)
831     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
832       raise errors.OpPrereqError("Invalid checks to be skipped specified")
833
834   def Exec(self, feedback_fn):
835     """Verify integrity of cluster, performing various test on nodes.
836
837     """
838     bad = False
839     feedback_fn("* Verifying global settings")
840     for msg in self.cfg.VerifyConfig():
841       feedback_fn("  - ERROR: %s" % msg)
842
843     vg_name = self.cfg.GetVGName()
844     nodelist = utils.NiceSort(self.cfg.GetNodeList())
845     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
846     i_non_redundant = [] # Non redundant instances
847     node_volume = {}
848     node_instance = {}
849     node_info = {}
850     instance_cfg = {}
851
852     # FIXME: verify OS list
853     # do local checksums
854     file_names = list(self.sstore.GetFileList())
855     file_names.append(constants.SSL_CERT_FILE)
856     file_names.append(constants.CLUSTER_CONF_FILE)
857     local_checksums = utils.FingerprintFiles(file_names)
858
859     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
860     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
861     all_instanceinfo = rpc.call_instance_list(nodelist)
862     all_vglist = rpc.call_vg_list(nodelist)
863     node_verify_param = {
864       'filelist': file_names,
865       'nodelist': nodelist,
866       'hypervisor': None,
867       }
868     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
869     all_rversion = rpc.call_version(nodelist)
870     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
871
872     for node in nodelist:
873       feedback_fn("* Verifying node %s" % node)
874       result = self._VerifyNode(node, file_names, local_checksums,
875                                 all_vglist[node], all_nvinfo[node],
876                                 all_rversion[node], feedback_fn)
877       bad = bad or result
878
879       # node_volume
880       volumeinfo = all_volumeinfo[node]
881
882       if isinstance(volumeinfo, basestring):
883         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
884                     (node, volumeinfo[-400:].encode('string_escape')))
885         bad = True
886         node_volume[node] = {}
887       elif not isinstance(volumeinfo, dict):
888         feedback_fn("  - ERROR: connection to %s failed" % (node,))
889         bad = True
890         continue
891       else:
892         node_volume[node] = volumeinfo
893
894       # node_instance
895       nodeinstance = all_instanceinfo[node]
896       if type(nodeinstance) != list:
897         feedback_fn("  - ERROR: connection to %s failed" % (node,))
898         bad = True
899         continue
900
901       node_instance[node] = nodeinstance
902
903       # node_info
904       nodeinfo = all_ninfo[node]
905       if not isinstance(nodeinfo, dict):
906         feedback_fn("  - ERROR: connection to %s failed" % (node,))
907         bad = True
908         continue
909
910       try:
911         node_info[node] = {
912           "mfree": int(nodeinfo['memory_free']),
913           "dfree": int(nodeinfo['vg_free']),
914           "pinst": [],
915           "sinst": [],
916           # dictionary holding all instances this node is secondary for,
917           # grouped by their primary node. Each key is a cluster node, and each
918           # value is a list of instances which have the key as primary and the
919           # current node as secondary.  this is handy to calculate N+1 memory
920           # availability if you can only failover from a primary to its
921           # secondary.
922           "sinst-by-pnode": {},
923         }
924       except ValueError:
925         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
926         bad = True
927         continue
928
929     node_vol_should = {}
930
931     for instance in instancelist:
932       feedback_fn("* Verifying instance %s" % instance)
933       inst_config = self.cfg.GetInstanceInfo(instance)
934       result =  self._VerifyInstance(instance, inst_config, node_volume,
935                                      node_instance, feedback_fn)
936       bad = bad or result
937
938       inst_config.MapLVsByNode(node_vol_should)
939
940       instance_cfg[instance] = inst_config
941
942       pnode = inst_config.primary_node
943       if pnode in node_info:
944         node_info[pnode]['pinst'].append(instance)
945       else:
946         feedback_fn("  - ERROR: instance %s, connection to primary node"
947                     " %s failed" % (instance, pnode))
948         bad = True
949
950       # If the instance is non-redundant we cannot survive losing its primary
951       # node, so we are not N+1 compliant. On the other hand we have no disk
952       # templates with more than one secondary so that situation is not well
953       # supported either.
954       # FIXME: does not support file-backed instances
955       if len(inst_config.secondary_nodes) == 0:
956         i_non_redundant.append(instance)
957       elif len(inst_config.secondary_nodes) > 1:
958         feedback_fn("  - WARNING: multiple secondaries for instance %s"
959                     % instance)
960
961       for snode in inst_config.secondary_nodes:
962         if snode in node_info:
963           node_info[snode]['sinst'].append(instance)
964           if pnode not in node_info[snode]['sinst-by-pnode']:
965             node_info[snode]['sinst-by-pnode'][pnode] = []
966           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
967         else:
968           feedback_fn("  - ERROR: instance %s, connection to secondary node"
969                       " %s failed" % (instance, snode))
970
971     feedback_fn("* Verifying orphan volumes")
972     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
973                                        feedback_fn)
974     bad = bad or result
975
976     feedback_fn("* Verifying remaining instances")
977     result = self._VerifyOrphanInstances(instancelist, node_instance,
978                                          feedback_fn)
979     bad = bad or result
980
981     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
982       feedback_fn("* Verifying N+1 Memory redundancy")
983       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
984       bad = bad or result
985
986     feedback_fn("* Other Notes")
987     if i_non_redundant:
988       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
989                   % len(i_non_redundant))
990
991     return int(bad)
992
993
994 class LUVerifyDisks(NoHooksLU):
995   """Verifies the cluster disks status.
996
997   """
998   _OP_REQP = []
999
1000   def CheckPrereq(self):
1001     """Check prerequisites.
1002
1003     This has no prerequisites.
1004
1005     """
1006     pass
1007
1008   def Exec(self, feedback_fn):
1009     """Verify integrity of cluster disks.
1010
1011     """
1012     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1013
1014     vg_name = self.cfg.GetVGName()
1015     nodes = utils.NiceSort(self.cfg.GetNodeList())
1016     instances = [self.cfg.GetInstanceInfo(name)
1017                  for name in self.cfg.GetInstanceList()]
1018
1019     nv_dict = {}
1020     for inst in instances:
1021       inst_lvs = {}
1022       if (inst.status != "up" or
1023           inst.disk_template not in constants.DTS_NET_MIRROR):
1024         continue
1025       inst.MapLVsByNode(inst_lvs)
1026       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1027       for node, vol_list in inst_lvs.iteritems():
1028         for vol in vol_list:
1029           nv_dict[(node, vol)] = inst
1030
1031     if not nv_dict:
1032       return result
1033
1034     node_lvs = rpc.call_volume_list(nodes, vg_name)
1035
1036     to_act = set()
1037     for node in nodes:
1038       # node_volume
1039       lvs = node_lvs[node]
1040
1041       if isinstance(lvs, basestring):
1042         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
1043         res_nlvm[node] = lvs
1044       elif not isinstance(lvs, dict):
1045         logger.Info("connection to node %s failed or invalid data returned" %
1046                     (node,))
1047         res_nodes.append(node)
1048         continue
1049
1050       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1051         inst = nv_dict.pop((node, lv_name), None)
1052         if (not lv_online and inst is not None
1053             and inst.name not in res_instances):
1054           res_instances.append(inst.name)
1055
1056     # any leftover items in nv_dict are missing LVs, let's arrange the
1057     # data better
1058     for key, inst in nv_dict.iteritems():
1059       if inst.name not in res_missing:
1060         res_missing[inst.name] = []
1061       res_missing[inst.name].append(key)
1062
1063     return result
1064
1065
1066 class LURenameCluster(LogicalUnit):
1067   """Rename the cluster.
1068
1069   """
1070   HPATH = "cluster-rename"
1071   HTYPE = constants.HTYPE_CLUSTER
1072   _OP_REQP = ["name"]
1073
1074   def BuildHooksEnv(self):
1075     """Build hooks env.
1076
1077     """
1078     env = {
1079       "OP_TARGET": self.sstore.GetClusterName(),
1080       "NEW_NAME": self.op.name,
1081       }
1082     mn = self.sstore.GetMasterNode()
1083     return env, [mn], [mn]
1084
1085   def CheckPrereq(self):
1086     """Verify that the passed name is a valid one.
1087
1088     """
1089     hostname = utils.HostInfo(self.op.name)
1090
1091     new_name = hostname.name
1092     self.ip = new_ip = hostname.ip
1093     old_name = self.sstore.GetClusterName()
1094     old_ip = self.sstore.GetMasterIP()
1095     if new_name == old_name and new_ip == old_ip:
1096       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1097                                  " cluster has changed")
1098     if new_ip != old_ip:
1099       result = utils.RunCmd(["fping", "-q", new_ip])
1100       if not result.failed:
1101         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1102                                    " reachable on the network. Aborting." %
1103                                    new_ip)
1104
1105     self.op.name = new_name
1106
1107   def Exec(self, feedback_fn):
1108     """Rename the cluster.
1109
1110     """
1111     clustername = self.op.name
1112     ip = self.ip
1113     ss = self.sstore
1114
1115     # shutdown the master IP
1116     master = ss.GetMasterNode()
1117     if not rpc.call_node_stop_master(master):
1118       raise errors.OpExecError("Could not disable the master role")
1119
1120     try:
1121       # modify the sstore
1122       ss.SetKey(ss.SS_MASTER_IP, ip)
1123       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1124
1125       # Distribute updated ss config to all nodes
1126       myself = self.cfg.GetNodeInfo(master)
1127       dist_nodes = self.cfg.GetNodeList()
1128       if myself.name in dist_nodes:
1129         dist_nodes.remove(myself.name)
1130
1131       logger.Debug("Copying updated ssconf data to all nodes")
1132       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1133         fname = ss.KeyToFilename(keyname)
1134         result = rpc.call_upload_file(dist_nodes, fname)
1135         for to_node in dist_nodes:
1136           if not result[to_node]:
1137             logger.Error("copy of file %s to node %s failed" %
1138                          (fname, to_node))
1139     finally:
1140       if not rpc.call_node_start_master(master):
1141         logger.Error("Could not re-enable the master role on the master,"
1142                      " please restart manually.")
1143
1144
1145 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1146   """Sleep and poll for an instance's disk to sync.
1147
1148   """
1149   if not instance.disks:
1150     return True
1151
1152   if not oneshot:
1153     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1154
1155   node = instance.primary_node
1156
1157   for dev in instance.disks:
1158     cfgw.SetDiskID(dev, node)
1159
1160   retries = 0
1161   while True:
1162     max_time = 0
1163     done = True
1164     cumul_degraded = False
1165     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1166     if not rstats:
1167       proc.LogWarning("Can't get any data from node %s" % node)
1168       retries += 1
1169       if retries >= 10:
1170         raise errors.RemoteError("Can't contact node %s for mirror data,"
1171                                  " aborting." % node)
1172       time.sleep(6)
1173       continue
1174     retries = 0
1175     for i in range(len(rstats)):
1176       mstat = rstats[i]
1177       if mstat is None:
1178         proc.LogWarning("Can't compute data for node %s/%s" %
1179                         (node, instance.disks[i].iv_name))
1180         continue
1181       # we ignore the ldisk parameter
1182       perc_done, est_time, is_degraded, _ = mstat
1183       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1184       if perc_done is not None:
1185         done = False
1186         if est_time is not None:
1187           rem_time = "%d estimated seconds remaining" % est_time
1188           max_time = est_time
1189         else:
1190           rem_time = "no time estimate"
1191         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1192                      (instance.disks[i].iv_name, perc_done, rem_time))
1193     if done or oneshot:
1194       break
1195
1196     if unlock:
1197       utils.Unlock('cmd')
1198     try:
1199       time.sleep(min(60, max_time))
1200     finally:
1201       if unlock:
1202         utils.Lock('cmd')
1203
1204   if done:
1205     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1206   return not cumul_degraded
1207
1208
1209 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1210   """Check that mirrors are not degraded.
1211
1212   The ldisk parameter, if True, will change the test from the
1213   is_degraded attribute (which represents overall non-ok status for
1214   the device(s)) to the ldisk (representing the local storage status).
1215
1216   """
1217   cfgw.SetDiskID(dev, node)
1218   if ldisk:
1219     idx = 6
1220   else:
1221     idx = 5
1222
1223   result = True
1224   if on_primary or dev.AssembleOnSecondary():
1225     rstats = rpc.call_blockdev_find(node, dev)
1226     if not rstats:
1227       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1228       result = False
1229     else:
1230       result = result and (not rstats[idx])
1231   if dev.children:
1232     for child in dev.children:
1233       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1234
1235   return result
1236
1237
1238 class LUDiagnoseOS(NoHooksLU):
1239   """Logical unit for OS diagnose/query.
1240
1241   """
1242   _OP_REQP = []
1243
1244   def CheckPrereq(self):
1245     """Check prerequisites.
1246
1247     This always succeeds, since this is a pure query LU.
1248
1249     """
1250     return
1251
1252   def Exec(self, feedback_fn):
1253     """Compute the list of OSes.
1254
1255     """
1256     node_list = self.cfg.GetNodeList()
1257     node_data = rpc.call_os_diagnose(node_list)
1258     if node_data == False:
1259       raise errors.OpExecError("Can't gather the list of OSes")
1260     return node_data
1261
1262
1263 class LURemoveNode(LogicalUnit):
1264   """Logical unit for removing a node.
1265
1266   """
1267   HPATH = "node-remove"
1268   HTYPE = constants.HTYPE_NODE
1269   _OP_REQP = ["node_name"]
1270
1271   def BuildHooksEnv(self):
1272     """Build hooks env.
1273
1274     This doesn't run on the target node in the pre phase as a failed
1275     node would not allows itself to run.
1276
1277     """
1278     env = {
1279       "OP_TARGET": self.op.node_name,
1280       "NODE_NAME": self.op.node_name,
1281       }
1282     all_nodes = self.cfg.GetNodeList()
1283     all_nodes.remove(self.op.node_name)
1284     return env, all_nodes, all_nodes
1285
1286   def CheckPrereq(self):
1287     """Check prerequisites.
1288
1289     This checks:
1290      - the node exists in the configuration
1291      - it does not have primary or secondary instances
1292      - it's not the master
1293
1294     Any errors are signalled by raising errors.OpPrereqError.
1295
1296     """
1297     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1298     if node is None:
1299       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1300
1301     instance_list = self.cfg.GetInstanceList()
1302
1303     masternode = self.sstore.GetMasterNode()
1304     if node.name == masternode:
1305       raise errors.OpPrereqError("Node is the master node,"
1306                                  " you need to failover first.")
1307
1308     for instance_name in instance_list:
1309       instance = self.cfg.GetInstanceInfo(instance_name)
1310       if node.name == instance.primary_node:
1311         raise errors.OpPrereqError("Instance %s still running on the node,"
1312                                    " please remove first." % instance_name)
1313       if node.name in instance.secondary_nodes:
1314         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1315                                    " please remove first." % instance_name)
1316     self.op.node_name = node.name
1317     self.node = node
1318
1319   def Exec(self, feedback_fn):
1320     """Removes the node from the cluster.
1321
1322     """
1323     node = self.node
1324     logger.Info("stopping the node daemon and removing configs from node %s" %
1325                 node.name)
1326
1327     rpc.call_node_leave_cluster(node.name)
1328
1329     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1330
1331     logger.Info("Removing node %s from config" % node.name)
1332
1333     self.cfg.RemoveNode(node.name)
1334
1335     _RemoveHostFromEtcHosts(node.name)
1336
1337
1338 class LUQueryNodes(NoHooksLU):
1339   """Logical unit for querying nodes.
1340
1341   """
1342   _OP_REQP = ["output_fields", "names"]
1343
1344   def CheckPrereq(self):
1345     """Check prerequisites.
1346
1347     This checks that the fields required are valid output fields.
1348
1349     """
1350     self.dynamic_fields = frozenset(["dtotal", "dfree",
1351                                      "mtotal", "mnode", "mfree",
1352                                      "bootid"])
1353
1354     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1355                                "pinst_list", "sinst_list",
1356                                "pip", "sip"],
1357                        dynamic=self.dynamic_fields,
1358                        selected=self.op.output_fields)
1359
1360     self.wanted = _GetWantedNodes(self, self.op.names)
1361
1362   def Exec(self, feedback_fn):
1363     """Computes the list of nodes and their attributes.
1364
1365     """
1366     nodenames = self.wanted
1367     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1368
1369     # begin data gathering
1370
1371     if self.dynamic_fields.intersection(self.op.output_fields):
1372       live_data = {}
1373       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1374       for name in nodenames:
1375         nodeinfo = node_data.get(name, None)
1376         if nodeinfo:
1377           live_data[name] = {
1378             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1379             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1380             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1381             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1382             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1383             "bootid": nodeinfo['bootid'],
1384             }
1385         else:
1386           live_data[name] = {}
1387     else:
1388       live_data = dict.fromkeys(nodenames, {})
1389
1390     node_to_primary = dict([(name, set()) for name in nodenames])
1391     node_to_secondary = dict([(name, set()) for name in nodenames])
1392
1393     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1394                              "sinst_cnt", "sinst_list"))
1395     if inst_fields & frozenset(self.op.output_fields):
1396       instancelist = self.cfg.GetInstanceList()
1397
1398       for instance_name in instancelist:
1399         inst = self.cfg.GetInstanceInfo(instance_name)
1400         if inst.primary_node in node_to_primary:
1401           node_to_primary[inst.primary_node].add(inst.name)
1402         for secnode in inst.secondary_nodes:
1403           if secnode in node_to_secondary:
1404             node_to_secondary[secnode].add(inst.name)
1405
1406     # end data gathering
1407
1408     output = []
1409     for node in nodelist:
1410       node_output = []
1411       for field in self.op.output_fields:
1412         if field == "name":
1413           val = node.name
1414         elif field == "pinst_list":
1415           val = list(node_to_primary[node.name])
1416         elif field == "sinst_list":
1417           val = list(node_to_secondary[node.name])
1418         elif field == "pinst_cnt":
1419           val = len(node_to_primary[node.name])
1420         elif field == "sinst_cnt":
1421           val = len(node_to_secondary[node.name])
1422         elif field == "pip":
1423           val = node.primary_ip
1424         elif field == "sip":
1425           val = node.secondary_ip
1426         elif field in self.dynamic_fields:
1427           val = live_data[node.name].get(field, None)
1428         else:
1429           raise errors.ParameterError(field)
1430         node_output.append(val)
1431       output.append(node_output)
1432
1433     return output
1434
1435
1436 class LUQueryNodeVolumes(NoHooksLU):
1437   """Logical unit for getting volumes on node(s).
1438
1439   """
1440   _OP_REQP = ["nodes", "output_fields"]
1441
1442   def CheckPrereq(self):
1443     """Check prerequisites.
1444
1445     This checks that the fields required are valid output fields.
1446
1447     """
1448     self.nodes = _GetWantedNodes(self, self.op.nodes)
1449
1450     _CheckOutputFields(static=["node"],
1451                        dynamic=["phys", "vg", "name", "size", "instance"],
1452                        selected=self.op.output_fields)
1453
1454
1455   def Exec(self, feedback_fn):
1456     """Computes the list of nodes and their attributes.
1457
1458     """
1459     nodenames = self.nodes
1460     volumes = rpc.call_node_volumes(nodenames)
1461
1462     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1463              in self.cfg.GetInstanceList()]
1464
1465     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1466
1467     output = []
1468     for node in nodenames:
1469       if node not in volumes or not volumes[node]:
1470         continue
1471
1472       node_vols = volumes[node][:]
1473       node_vols.sort(key=lambda vol: vol['dev'])
1474
1475       for vol in node_vols:
1476         node_output = []
1477         for field in self.op.output_fields:
1478           if field == "node":
1479             val = node
1480           elif field == "phys":
1481             val = vol['dev']
1482           elif field == "vg":
1483             val = vol['vg']
1484           elif field == "name":
1485             val = vol['name']
1486           elif field == "size":
1487             val = int(float(vol['size']))
1488           elif field == "instance":
1489             for inst in ilist:
1490               if node not in lv_by_node[inst]:
1491                 continue
1492               if vol['name'] in lv_by_node[inst][node]:
1493                 val = inst.name
1494                 break
1495             else:
1496               val = '-'
1497           else:
1498             raise errors.ParameterError(field)
1499           node_output.append(str(val))
1500
1501         output.append(node_output)
1502
1503     return output
1504
1505
1506 class LUAddNode(LogicalUnit):
1507   """Logical unit for adding node to the cluster.
1508
1509   """
1510   HPATH = "node-add"
1511   HTYPE = constants.HTYPE_NODE
1512   _OP_REQP = ["node_name"]
1513
1514   def BuildHooksEnv(self):
1515     """Build hooks env.
1516
1517     This will run on all nodes before, and on all nodes + the new node after.
1518
1519     """
1520     env = {
1521       "OP_TARGET": self.op.node_name,
1522       "NODE_NAME": self.op.node_name,
1523       "NODE_PIP": self.op.primary_ip,
1524       "NODE_SIP": self.op.secondary_ip,
1525       }
1526     nodes_0 = self.cfg.GetNodeList()
1527     nodes_1 = nodes_0 + [self.op.node_name, ]
1528     return env, nodes_0, nodes_1
1529
1530   def CheckPrereq(self):
1531     """Check prerequisites.
1532
1533     This checks:
1534      - the new node is not already in the config
1535      - it is resolvable
1536      - its parameters (single/dual homed) matches the cluster
1537
1538     Any errors are signalled by raising errors.OpPrereqError.
1539
1540     """
1541     node_name = self.op.node_name
1542     cfg = self.cfg
1543
1544     dns_data = utils.HostInfo(node_name)
1545
1546     node = dns_data.name
1547     primary_ip = self.op.primary_ip = dns_data.ip
1548     secondary_ip = getattr(self.op, "secondary_ip", None)
1549     if secondary_ip is None:
1550       secondary_ip = primary_ip
1551     if not utils.IsValidIP(secondary_ip):
1552       raise errors.OpPrereqError("Invalid secondary IP given")
1553     self.op.secondary_ip = secondary_ip
1554     node_list = cfg.GetNodeList()
1555     if node in node_list:
1556       raise errors.OpPrereqError("Node %s is already in the configuration"
1557                                  % node)
1558
1559     for existing_node_name in node_list:
1560       existing_node = cfg.GetNodeInfo(existing_node_name)
1561       if (existing_node.primary_ip == primary_ip or
1562           existing_node.secondary_ip == primary_ip or
1563           existing_node.primary_ip == secondary_ip or
1564           existing_node.secondary_ip == secondary_ip):
1565         raise errors.OpPrereqError("New node ip address(es) conflict with"
1566                                    " existing node %s" % existing_node.name)
1567
1568     # check that the type of the node (single versus dual homed) is the
1569     # same as for the master
1570     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1571     master_singlehomed = myself.secondary_ip == myself.primary_ip
1572     newbie_singlehomed = secondary_ip == primary_ip
1573     if master_singlehomed != newbie_singlehomed:
1574       if master_singlehomed:
1575         raise errors.OpPrereqError("The master has no private ip but the"
1576                                    " new node has one")
1577       else:
1578         raise errors.OpPrereqError("The master has a private ip but the"
1579                                    " new node doesn't have one")
1580
1581     # checks reachablity
1582     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1583       raise errors.OpPrereqError("Node not reachable by ping")
1584
1585     if not newbie_singlehomed:
1586       # check reachability from my secondary ip to newbie's secondary ip
1587       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1588                            source=myself.secondary_ip):
1589         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1590                                    " based ping to noded port")
1591
1592     self.new_node = objects.Node(name=node,
1593                                  primary_ip=primary_ip,
1594                                  secondary_ip=secondary_ip)
1595
1596     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1597       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1598         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1599                                    constants.VNC_PASSWORD_FILE)
1600
1601   def Exec(self, feedback_fn):
1602     """Adds the new node to the cluster.
1603
1604     """
1605     new_node = self.new_node
1606     node = new_node.name
1607
1608     # set up inter-node password and certificate and restarts the node daemon
1609     gntpass = self.sstore.GetNodeDaemonPassword()
1610     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1611       raise errors.OpExecError("ganeti password corruption detected")
1612     f = open(constants.SSL_CERT_FILE)
1613     try:
1614       gntpem = f.read(8192)
1615     finally:
1616       f.close()
1617     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1618     # so we use this to detect an invalid certificate; as long as the
1619     # cert doesn't contain this, the here-document will be correctly
1620     # parsed by the shell sequence below
1621     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1622       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1623     if not gntpem.endswith("\n"):
1624       raise errors.OpExecError("PEM must end with newline")
1625     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1626
1627     # and then connect with ssh to set password and start ganeti-noded
1628     # note that all the below variables are sanitized at this point,
1629     # either by being constants or by the checks above
1630     ss = self.sstore
1631     mycommand = ("umask 077 && "
1632                  "echo '%s' > '%s' && "
1633                  "cat > '%s' << '!EOF.' && \n"
1634                  "%s!EOF.\n%s restart" %
1635                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1636                   constants.SSL_CERT_FILE, gntpem,
1637                   constants.NODE_INITD_SCRIPT))
1638
1639     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1640     if result.failed:
1641       raise errors.OpExecError("Remote command on node %s, error: %s,"
1642                                " output: %s" %
1643                                (node, result.fail_reason, result.output))
1644
1645     # check connectivity
1646     time.sleep(4)
1647
1648     result = rpc.call_version([node])[node]
1649     if result:
1650       if constants.PROTOCOL_VERSION == result:
1651         logger.Info("communication to node %s fine, sw version %s match" %
1652                     (node, result))
1653       else:
1654         raise errors.OpExecError("Version mismatch master version %s,"
1655                                  " node version %s" %
1656                                  (constants.PROTOCOL_VERSION, result))
1657     else:
1658       raise errors.OpExecError("Cannot get version from the new node")
1659
1660     # setup ssh on node
1661     logger.Info("copy ssh key to node %s" % node)
1662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1663     keyarray = []
1664     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1665                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1666                 priv_key, pub_key]
1667
1668     for i in keyfiles:
1669       f = open(i, 'r')
1670       try:
1671         keyarray.append(f.read())
1672       finally:
1673         f.close()
1674
1675     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1676                                keyarray[3], keyarray[4], keyarray[5])
1677
1678     if not result:
1679       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1680
1681     # Add node to our /etc/hosts, and add key to known_hosts
1682     _AddHostToEtcHosts(new_node.name)
1683
1684     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1685                       self.cfg.GetHostKey())
1686
1687     if new_node.secondary_ip != new_node.primary_ip:
1688       if not rpc.call_node_tcp_ping(new_node.name,
1689                                     constants.LOCALHOST_IP_ADDRESS,
1690                                     new_node.secondary_ip,
1691                                     constants.DEFAULT_NODED_PORT,
1692                                     10, False):
1693         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1694                                  " you gave (%s). Please fix and re-run this"
1695                                  " command." % new_node.secondary_ip)
1696
1697     success, msg = ssh.VerifyNodeHostname(node)
1698     if not success:
1699       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1700                                " than the one the resolver gives: %s."
1701                                " Please fix and re-run this command." %
1702                                (node, msg))
1703
1704     # Distribute updated /etc/hosts and known_hosts to all nodes,
1705     # including the node just added
1706     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1707     dist_nodes = self.cfg.GetNodeList() + [node]
1708     if myself.name in dist_nodes:
1709       dist_nodes.remove(myself.name)
1710
1711     logger.Debug("Copying hosts and known_hosts to all nodes")
1712     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1713       result = rpc.call_upload_file(dist_nodes, fname)
1714       for to_node in dist_nodes:
1715         if not result[to_node]:
1716           logger.Error("copy of file %s to node %s failed" %
1717                        (fname, to_node))
1718
1719     to_copy = ss.GetFileList()
1720     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1721       to_copy.append(constants.VNC_PASSWORD_FILE)
1722     for fname in to_copy:
1723       if not ssh.CopyFileToNode(node, fname):
1724         logger.Error("could not copy file %s to node %s" % (fname, node))
1725
1726     logger.Info("adding node %s to cluster.conf" % node)
1727     self.cfg.AddNode(new_node)
1728
1729
1730 class LUMasterFailover(LogicalUnit):
1731   """Failover the master node to the current node.
1732
1733   This is a special LU in that it must run on a non-master node.
1734
1735   """
1736   HPATH = "master-failover"
1737   HTYPE = constants.HTYPE_CLUSTER
1738   REQ_MASTER = False
1739   _OP_REQP = []
1740
1741   def BuildHooksEnv(self):
1742     """Build hooks env.
1743
1744     This will run on the new master only in the pre phase, and on all
1745     the nodes in the post phase.
1746
1747     """
1748     env = {
1749       "OP_TARGET": self.new_master,
1750       "NEW_MASTER": self.new_master,
1751       "OLD_MASTER": self.old_master,
1752       }
1753     return env, [self.new_master], self.cfg.GetNodeList()
1754
1755   def CheckPrereq(self):
1756     """Check prerequisites.
1757
1758     This checks that we are not already the master.
1759
1760     """
1761     self.new_master = utils.HostInfo().name
1762     self.old_master = self.sstore.GetMasterNode()
1763
1764     if self.old_master == self.new_master:
1765       raise errors.OpPrereqError("This commands must be run on the node"
1766                                  " where you want the new master to be."
1767                                  " %s is already the master" %
1768                                  self.old_master)
1769
1770   def Exec(self, feedback_fn):
1771     """Failover the master node.
1772
1773     This command, when run on a non-master node, will cause the current
1774     master to cease being master, and the non-master to become new
1775     master.
1776
1777     """
1778     #TODO: do not rely on gethostname returning the FQDN
1779     logger.Info("setting master to %s, old master: %s" %
1780                 (self.new_master, self.old_master))
1781
1782     if not rpc.call_node_stop_master(self.old_master):
1783       logger.Error("could disable the master role on the old master"
1784                    " %s, please disable manually" % self.old_master)
1785
1786     ss = self.sstore
1787     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1788     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1789                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1790       logger.Error("could not distribute the new simple store master file"
1791                    " to the other nodes, please check.")
1792
1793     if not rpc.call_node_start_master(self.new_master):
1794       logger.Error("could not start the master role on the new master"
1795                    " %s, please check" % self.new_master)
1796       feedback_fn("Error in activating the master IP on the new master,"
1797                   " please fix manually.")
1798
1799
1800
1801 class LUQueryClusterInfo(NoHooksLU):
1802   """Query cluster configuration.
1803
1804   """
1805   _OP_REQP = []
1806   REQ_MASTER = False
1807
1808   def CheckPrereq(self):
1809     """No prerequsites needed for this LU.
1810
1811     """
1812     pass
1813
1814   def Exec(self, feedback_fn):
1815     """Return cluster config.
1816
1817     """
1818     result = {
1819       "name": self.sstore.GetClusterName(),
1820       "software_version": constants.RELEASE_VERSION,
1821       "protocol_version": constants.PROTOCOL_VERSION,
1822       "config_version": constants.CONFIG_VERSION,
1823       "os_api_version": constants.OS_API_VERSION,
1824       "export_version": constants.EXPORT_VERSION,
1825       "master": self.sstore.GetMasterNode(),
1826       "architecture": (platform.architecture()[0], platform.machine()),
1827       }
1828
1829     return result
1830
1831
1832 class LUClusterCopyFile(NoHooksLU):
1833   """Copy file to cluster.
1834
1835   """
1836   _OP_REQP = ["nodes", "filename"]
1837
1838   def CheckPrereq(self):
1839     """Check prerequisites.
1840
1841     It should check that the named file exists and that the given list
1842     of nodes is valid.
1843
1844     """
1845     if not os.path.exists(self.op.filename):
1846       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1847
1848     self.nodes = _GetWantedNodes(self, self.op.nodes)
1849
1850   def Exec(self, feedback_fn):
1851     """Copy a file from master to some nodes.
1852
1853     Args:
1854       opts - class with options as members
1855       args - list containing a single element, the file name
1856     Opts used:
1857       nodes - list containing the name of target nodes; if empty, all nodes
1858
1859     """
1860     filename = self.op.filename
1861
1862     myname = utils.HostInfo().name
1863
1864     for node in self.nodes:
1865       if node == myname:
1866         continue
1867       if not ssh.CopyFileToNode(node, filename):
1868         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1869
1870
1871 class LUDumpClusterConfig(NoHooksLU):
1872   """Return a text-representation of the cluster-config.
1873
1874   """
1875   _OP_REQP = []
1876
1877   def CheckPrereq(self):
1878     """No prerequisites.
1879
1880     """
1881     pass
1882
1883   def Exec(self, feedback_fn):
1884     """Dump a representation of the cluster config to the standard output.
1885
1886     """
1887     return self.cfg.DumpConfig()
1888
1889
1890 class LURunClusterCommand(NoHooksLU):
1891   """Run a command on some nodes.
1892
1893   """
1894   _OP_REQP = ["command", "nodes"]
1895
1896   def CheckPrereq(self):
1897     """Check prerequisites.
1898
1899     It checks that the given list of nodes is valid.
1900
1901     """
1902     self.nodes = _GetWantedNodes(self, self.op.nodes)
1903
1904   def Exec(self, feedback_fn):
1905     """Run a command on some nodes.
1906
1907     """
1908     # put the master at the end of the nodes list
1909     master_node = self.sstore.GetMasterNode()
1910     if master_node in self.nodes:
1911       self.nodes.remove(master_node)
1912       self.nodes.append(master_node)
1913
1914     data = []
1915     for node in self.nodes:
1916       result = ssh.SSHCall(node, "root", self.op.command)
1917       data.append((node, result.output, result.exit_code))
1918
1919     return data
1920
1921
1922 class LUActivateInstanceDisks(NoHooksLU):
1923   """Bring up an instance's disks.
1924
1925   """
1926   _OP_REQP = ["instance_name"]
1927
1928   def CheckPrereq(self):
1929     """Check prerequisites.
1930
1931     This checks that the instance is in the cluster.
1932
1933     """
1934     instance = self.cfg.GetInstanceInfo(
1935       self.cfg.ExpandInstanceName(self.op.instance_name))
1936     if instance is None:
1937       raise errors.OpPrereqError("Instance '%s' not known" %
1938                                  self.op.instance_name)
1939     self.instance = instance
1940
1941
1942   def Exec(self, feedback_fn):
1943     """Activate the disks.
1944
1945     """
1946     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1947     if not disks_ok:
1948       raise errors.OpExecError("Cannot activate block devices")
1949
1950     return disks_info
1951
1952
1953 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1954   """Prepare the block devices for an instance.
1955
1956   This sets up the block devices on all nodes.
1957
1958   Args:
1959     instance: a ganeti.objects.Instance object
1960     ignore_secondaries: if true, errors on secondary nodes won't result
1961                         in an error return from the function
1962
1963   Returns:
1964     false if the operation failed
1965     list of (host, instance_visible_name, node_visible_name) if the operation
1966          suceeded with the mapping from node devices to instance devices
1967   """
1968   device_info = []
1969   disks_ok = True
1970   iname = instance.name
1971   # With the two passes mechanism we try to reduce the window of
1972   # opportunity for the race condition of switching DRBD to primary
1973   # before handshaking occured, but we do not eliminate it
1974
1975   # The proper fix would be to wait (with some limits) until the
1976   # connection has been made and drbd transitions from WFConnection
1977   # into any other network-connected state (Connected, SyncTarget,
1978   # SyncSource, etc.)
1979
1980   # 1st pass, assemble on all nodes in secondary mode
1981   for inst_disk in instance.disks:
1982     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983       cfg.SetDiskID(node_disk, node)
1984       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1985       if not result:
1986         logger.Error("could not prepare block device %s on node %s"
1987                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1988         if not ignore_secondaries:
1989           disks_ok = False
1990
1991   # FIXME: race condition on drbd migration to primary
1992
1993   # 2nd pass, do only the primary node
1994   for inst_disk in instance.disks:
1995     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1996       if node != instance.primary_node:
1997         continue
1998       cfg.SetDiskID(node_disk, node)
1999       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
2000       if not result:
2001         logger.Error("could not prepare block device %s on node %s"
2002                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2003         disks_ok = False
2004     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2005
2006   # leave the disks configured for the primary node
2007   # this is a workaround that would be fixed better by
2008   # improving the logical/physical id handling
2009   for disk in instance.disks:
2010     cfg.SetDiskID(disk, instance.primary_node)
2011
2012   return disks_ok, device_info
2013
2014
2015 def _StartInstanceDisks(cfg, instance, force):
2016   """Start the disks of an instance.
2017
2018   """
2019   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2020                                            ignore_secondaries=force)
2021   if not disks_ok:
2022     _ShutdownInstanceDisks(instance, cfg)
2023     if force is not None and not force:
2024       logger.Error("If the message above refers to a secondary node,"
2025                    " you can retry the operation using '--force'.")
2026     raise errors.OpExecError("Disk consistency error")
2027
2028
2029 class LUDeactivateInstanceDisks(NoHooksLU):
2030   """Shutdown an instance's disks.
2031
2032   """
2033   _OP_REQP = ["instance_name"]
2034
2035   def CheckPrereq(self):
2036     """Check prerequisites.
2037
2038     This checks that the instance is in the cluster.
2039
2040     """
2041     instance = self.cfg.GetInstanceInfo(
2042       self.cfg.ExpandInstanceName(self.op.instance_name))
2043     if instance is None:
2044       raise errors.OpPrereqError("Instance '%s' not known" %
2045                                  self.op.instance_name)
2046     self.instance = instance
2047
2048   def Exec(self, feedback_fn):
2049     """Deactivate the disks
2050
2051     """
2052     instance = self.instance
2053     ins_l = rpc.call_instance_list([instance.primary_node])
2054     ins_l = ins_l[instance.primary_node]
2055     if not type(ins_l) is list:
2056       raise errors.OpExecError("Can't contact node '%s'" %
2057                                instance.primary_node)
2058
2059     if self.instance.name in ins_l:
2060       raise errors.OpExecError("Instance is running, can't shutdown"
2061                                " block devices.")
2062
2063     _ShutdownInstanceDisks(instance, self.cfg)
2064
2065
2066 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2067   """Shutdown block devices of an instance.
2068
2069   This does the shutdown on all nodes of the instance.
2070
2071   If the ignore_primary is false, errors on the primary node are
2072   ignored.
2073
2074   """
2075   result = True
2076   for disk in instance.disks:
2077     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2078       cfg.SetDiskID(top_disk, node)
2079       if not rpc.call_blockdev_shutdown(node, top_disk):
2080         logger.Error("could not shutdown block device %s on node %s" %
2081                      (disk.iv_name, node))
2082         if not ignore_primary or node != instance.primary_node:
2083           result = False
2084   return result
2085
2086
2087 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2088   """Checks if a node has enough free memory.
2089
2090   This function check if a given node has the needed amount of free
2091   memory. In case the node has less memory or we cannot get the
2092   information from the node, this function raise an OpPrereqError
2093   exception.
2094
2095   Args:
2096     - cfg: a ConfigWriter instance
2097     - node: the node name
2098     - reason: string to use in the error message
2099     - requested: the amount of memory in MiB
2100
2101   """
2102   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2103   if not nodeinfo or not isinstance(nodeinfo, dict):
2104     raise errors.OpPrereqError("Could not contact node %s for resource"
2105                              " information" % (node,))
2106
2107   free_mem = nodeinfo[node].get('memory_free')
2108   if not isinstance(free_mem, int):
2109     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2110                              " was '%s'" % (node, free_mem))
2111   if requested > free_mem:
2112     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2113                              " needed %s MiB, available %s MiB" %
2114                              (node, reason, requested, free_mem))
2115
2116
2117 class LUStartupInstance(LogicalUnit):
2118   """Starts an instance.
2119
2120   """
2121   HPATH = "instance-start"
2122   HTYPE = constants.HTYPE_INSTANCE
2123   _OP_REQP = ["instance_name", "force"]
2124
2125   def BuildHooksEnv(self):
2126     """Build hooks env.
2127
2128     This runs on master, primary and secondary nodes of the instance.
2129
2130     """
2131     env = {
2132       "FORCE": self.op.force,
2133       }
2134     env.update(_BuildInstanceHookEnvByObject(self.instance))
2135     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2136           list(self.instance.secondary_nodes))
2137     return env, nl, nl
2138
2139   def CheckPrereq(self):
2140     """Check prerequisites.
2141
2142     This checks that the instance is in the cluster.
2143
2144     """
2145     instance = self.cfg.GetInstanceInfo(
2146       self.cfg.ExpandInstanceName(self.op.instance_name))
2147     if instance is None:
2148       raise errors.OpPrereqError("Instance '%s' not known" %
2149                                  self.op.instance_name)
2150
2151     # check bridges existance
2152     _CheckInstanceBridgesExist(instance)
2153
2154     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2155                          "starting instance %s" % instance.name,
2156                          instance.memory)
2157
2158     self.instance = instance
2159     self.op.instance_name = instance.name
2160
2161   def Exec(self, feedback_fn):
2162     """Start the instance.
2163
2164     """
2165     instance = self.instance
2166     force = self.op.force
2167     extra_args = getattr(self.op, "extra_args", "")
2168
2169     self.cfg.MarkInstanceUp(instance.name)
2170
2171     node_current = instance.primary_node
2172
2173     _StartInstanceDisks(self.cfg, instance, force)
2174
2175     if not rpc.call_instance_start(node_current, instance, extra_args):
2176       _ShutdownInstanceDisks(instance, self.cfg)
2177       raise errors.OpExecError("Could not start instance")
2178
2179
2180 class LURebootInstance(LogicalUnit):
2181   """Reboot an instance.
2182
2183   """
2184   HPATH = "instance-reboot"
2185   HTYPE = constants.HTYPE_INSTANCE
2186   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2187
2188   def BuildHooksEnv(self):
2189     """Build hooks env.
2190
2191     This runs on master, primary and secondary nodes of the instance.
2192
2193     """
2194     env = {
2195       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2196       }
2197     env.update(_BuildInstanceHookEnvByObject(self.instance))
2198     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199           list(self.instance.secondary_nodes))
2200     return env, nl, nl
2201
2202   def CheckPrereq(self):
2203     """Check prerequisites.
2204
2205     This checks that the instance is in the cluster.
2206
2207     """
2208     instance = self.cfg.GetInstanceInfo(
2209       self.cfg.ExpandInstanceName(self.op.instance_name))
2210     if instance is None:
2211       raise errors.OpPrereqError("Instance '%s' not known" %
2212                                  self.op.instance_name)
2213
2214     # check bridges existance
2215     _CheckInstanceBridgesExist(instance)
2216
2217     self.instance = instance
2218     self.op.instance_name = instance.name
2219
2220   def Exec(self, feedback_fn):
2221     """Reboot the instance.
2222
2223     """
2224     instance = self.instance
2225     ignore_secondaries = self.op.ignore_secondaries
2226     reboot_type = self.op.reboot_type
2227     extra_args = getattr(self.op, "extra_args", "")
2228
2229     node_current = instance.primary_node
2230
2231     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2232                            constants.INSTANCE_REBOOT_HARD,
2233                            constants.INSTANCE_REBOOT_FULL]:
2234       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2235                                   (constants.INSTANCE_REBOOT_SOFT,
2236                                    constants.INSTANCE_REBOOT_HARD,
2237                                    constants.INSTANCE_REBOOT_FULL))
2238
2239     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2240                        constants.INSTANCE_REBOOT_HARD]:
2241       if not rpc.call_instance_reboot(node_current, instance,
2242                                       reboot_type, extra_args):
2243         raise errors.OpExecError("Could not reboot instance")
2244     else:
2245       if not rpc.call_instance_shutdown(node_current, instance):
2246         raise errors.OpExecError("could not shutdown instance for full reboot")
2247       _ShutdownInstanceDisks(instance, self.cfg)
2248       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2249       if not rpc.call_instance_start(node_current, instance, extra_args):
2250         _ShutdownInstanceDisks(instance, self.cfg)
2251         raise errors.OpExecError("Could not start instance for full reboot")
2252
2253     self.cfg.MarkInstanceUp(instance.name)
2254
2255
2256 class LUShutdownInstance(LogicalUnit):
2257   """Shutdown an instance.
2258
2259   """
2260   HPATH = "instance-stop"
2261   HTYPE = constants.HTYPE_INSTANCE
2262   _OP_REQP = ["instance_name"]
2263
2264   def BuildHooksEnv(self):
2265     """Build hooks env.
2266
2267     This runs on master, primary and secondary nodes of the instance.
2268
2269     """
2270     env = _BuildInstanceHookEnvByObject(self.instance)
2271     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2272           list(self.instance.secondary_nodes))
2273     return env, nl, nl
2274
2275   def CheckPrereq(self):
2276     """Check prerequisites.
2277
2278     This checks that the instance is in the cluster.
2279
2280     """
2281     instance = self.cfg.GetInstanceInfo(
2282       self.cfg.ExpandInstanceName(self.op.instance_name))
2283     if instance is None:
2284       raise errors.OpPrereqError("Instance '%s' not known" %
2285                                  self.op.instance_name)
2286     self.instance = instance
2287
2288   def Exec(self, feedback_fn):
2289     """Shutdown the instance.
2290
2291     """
2292     instance = self.instance
2293     node_current = instance.primary_node
2294     self.cfg.MarkInstanceDown(instance.name)
2295     if not rpc.call_instance_shutdown(node_current, instance):
2296       logger.Error("could not shutdown instance")
2297
2298     _ShutdownInstanceDisks(instance, self.cfg)
2299
2300
2301 class LUReinstallInstance(LogicalUnit):
2302   """Reinstall an instance.
2303
2304   """
2305   HPATH = "instance-reinstall"
2306   HTYPE = constants.HTYPE_INSTANCE
2307   _OP_REQP = ["instance_name"]
2308
2309   def BuildHooksEnv(self):
2310     """Build hooks env.
2311
2312     This runs on master, primary and secondary nodes of the instance.
2313
2314     """
2315     env = _BuildInstanceHookEnvByObject(self.instance)
2316     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2317           list(self.instance.secondary_nodes))
2318     return env, nl, nl
2319
2320   def CheckPrereq(self):
2321     """Check prerequisites.
2322
2323     This checks that the instance is in the cluster and is not running.
2324
2325     """
2326     instance = self.cfg.GetInstanceInfo(
2327       self.cfg.ExpandInstanceName(self.op.instance_name))
2328     if instance is None:
2329       raise errors.OpPrereqError("Instance '%s' not known" %
2330                                  self.op.instance_name)
2331     if instance.disk_template == constants.DT_DISKLESS:
2332       raise errors.OpPrereqError("Instance '%s' has no disks" %
2333                                  self.op.instance_name)
2334     if instance.status != "down":
2335       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2336                                  self.op.instance_name)
2337     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2338     if remote_info:
2339       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2340                                  (self.op.instance_name,
2341                                   instance.primary_node))
2342
2343     self.op.os_type = getattr(self.op, "os_type", None)
2344     if self.op.os_type is not None:
2345       # OS verification
2346       pnode = self.cfg.GetNodeInfo(
2347         self.cfg.ExpandNodeName(instance.primary_node))
2348       if pnode is None:
2349         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2350                                    self.op.pnode)
2351       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2352       if not os_obj:
2353         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2354                                    " primary node"  % self.op.os_type)
2355
2356     self.instance = instance
2357
2358   def Exec(self, feedback_fn):
2359     """Reinstall the instance.
2360
2361     """
2362     inst = self.instance
2363
2364     if self.op.os_type is not None:
2365       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2366       inst.os = self.op.os_type
2367       self.cfg.AddInstance(inst)
2368
2369     _StartInstanceDisks(self.cfg, inst, None)
2370     try:
2371       feedback_fn("Running the instance OS create scripts...")
2372       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2373         raise errors.OpExecError("Could not install OS for instance %s"
2374                                  " on node %s" %
2375                                  (inst.name, inst.primary_node))
2376     finally:
2377       _ShutdownInstanceDisks(inst, self.cfg)
2378
2379
2380 class LURenameInstance(LogicalUnit):
2381   """Rename an instance.
2382
2383   """
2384   HPATH = "instance-rename"
2385   HTYPE = constants.HTYPE_INSTANCE
2386   _OP_REQP = ["instance_name", "new_name"]
2387
2388   def BuildHooksEnv(self):
2389     """Build hooks env.
2390
2391     This runs on master, primary and secondary nodes of the instance.
2392
2393     """
2394     env = _BuildInstanceHookEnvByObject(self.instance)
2395     env["INSTANCE_NEW_NAME"] = self.op.new_name
2396     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2397           list(self.instance.secondary_nodes))
2398     return env, nl, nl
2399
2400   def CheckPrereq(self):
2401     """Check prerequisites.
2402
2403     This checks that the instance is in the cluster and is not running.
2404
2405     """
2406     instance = self.cfg.GetInstanceInfo(
2407       self.cfg.ExpandInstanceName(self.op.instance_name))
2408     if instance is None:
2409       raise errors.OpPrereqError("Instance '%s' not known" %
2410                                  self.op.instance_name)
2411     if instance.status != "down":
2412       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2413                                  self.op.instance_name)
2414     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2415     if remote_info:
2416       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2417                                  (self.op.instance_name,
2418                                   instance.primary_node))
2419     self.instance = instance
2420
2421     # new name verification
2422     name_info = utils.HostInfo(self.op.new_name)
2423
2424     self.op.new_name = new_name = name_info.name
2425     instance_list = self.cfg.GetInstanceList()
2426     if new_name in instance_list:
2427       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2428                                  new_name)
2429
2430     if not getattr(self.op, "ignore_ip", False):
2431       command = ["fping", "-q", name_info.ip]
2432       result = utils.RunCmd(command)
2433       if not result.failed:
2434         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435                                    (name_info.ip, new_name))
2436
2437
2438   def Exec(self, feedback_fn):
2439     """Reinstall the instance.
2440
2441     """
2442     inst = self.instance
2443     old_name = inst.name
2444
2445     self.cfg.RenameInstance(inst.name, self.op.new_name)
2446
2447     # re-read the instance from the configuration after rename
2448     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2449
2450     _StartInstanceDisks(self.cfg, inst, None)
2451     try:
2452       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2453                                           "sda", "sdb"):
2454         msg = ("Could run OS rename script for instance %s on node %s (but the"
2455                " instance has been renamed in Ganeti)" %
2456                (inst.name, inst.primary_node))
2457         logger.Error(msg)
2458     finally:
2459       _ShutdownInstanceDisks(inst, self.cfg)
2460
2461
2462 class LURemoveInstance(LogicalUnit):
2463   """Remove an instance.
2464
2465   """
2466   HPATH = "instance-remove"
2467   HTYPE = constants.HTYPE_INSTANCE
2468   _OP_REQP = ["instance_name"]
2469
2470   def BuildHooksEnv(self):
2471     """Build hooks env.
2472
2473     This runs on master, primary and secondary nodes of the instance.
2474
2475     """
2476     env = _BuildInstanceHookEnvByObject(self.instance)
2477     nl = [self.sstore.GetMasterNode()]
2478     return env, nl, nl
2479
2480   def CheckPrereq(self):
2481     """Check prerequisites.
2482
2483     This checks that the instance is in the cluster.
2484
2485     """
2486     instance = self.cfg.GetInstanceInfo(
2487       self.cfg.ExpandInstanceName(self.op.instance_name))
2488     if instance is None:
2489       raise errors.OpPrereqError("Instance '%s' not known" %
2490                                  self.op.instance_name)
2491     self.instance = instance
2492
2493   def Exec(self, feedback_fn):
2494     """Remove the instance.
2495
2496     """
2497     instance = self.instance
2498     logger.Info("shutting down instance %s on node %s" %
2499                 (instance.name, instance.primary_node))
2500
2501     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2502       if self.op.ignore_failures:
2503         feedback_fn("Warning: can't shutdown instance")
2504       else:
2505         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2506                                  (instance.name, instance.primary_node))
2507
2508     logger.Info("removing block devices for instance %s" % instance.name)
2509
2510     if not _RemoveDisks(instance, self.cfg):
2511       if self.op.ignore_failures:
2512         feedback_fn("Warning: can't remove instance's disks")
2513       else:
2514         raise errors.OpExecError("Can't remove instance's disks")
2515
2516     logger.Info("removing instance %s out of cluster config" % instance.name)
2517
2518     self.cfg.RemoveInstance(instance.name)
2519
2520
2521 class LUQueryInstances(NoHooksLU):
2522   """Logical unit for querying instances.
2523
2524   """
2525   _OP_REQP = ["output_fields", "names"]
2526
2527   def CheckPrereq(self):
2528     """Check prerequisites.
2529
2530     This checks that the fields required are valid output fields.
2531
2532     """
2533     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2534     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2535                                "admin_state", "admin_ram",
2536                                "disk_template", "ip", "mac", "bridge",
2537                                "sda_size", "sdb_size", "vcpus"],
2538                        dynamic=self.dynamic_fields,
2539                        selected=self.op.output_fields)
2540
2541     self.wanted = _GetWantedInstances(self, self.op.names)
2542
2543   def Exec(self, feedback_fn):
2544     """Computes the list of nodes and their attributes.
2545
2546     """
2547     instance_names = self.wanted
2548     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2549                      in instance_names]
2550
2551     # begin data gathering
2552
2553     nodes = frozenset([inst.primary_node for inst in instance_list])
2554
2555     bad_nodes = []
2556     if self.dynamic_fields.intersection(self.op.output_fields):
2557       live_data = {}
2558       node_data = rpc.call_all_instances_info(nodes)
2559       for name in nodes:
2560         result = node_data[name]
2561         if result:
2562           live_data.update(result)
2563         elif result == False:
2564           bad_nodes.append(name)
2565         # else no instance is alive
2566     else:
2567       live_data = dict([(name, {}) for name in instance_names])
2568
2569     # end data gathering
2570
2571     output = []
2572     for instance in instance_list:
2573       iout = []
2574       for field in self.op.output_fields:
2575         if field == "name":
2576           val = instance.name
2577         elif field == "os":
2578           val = instance.os
2579         elif field == "pnode":
2580           val = instance.primary_node
2581         elif field == "snodes":
2582           val = list(instance.secondary_nodes)
2583         elif field == "admin_state":
2584           val = (instance.status != "down")
2585         elif field == "oper_state":
2586           if instance.primary_node in bad_nodes:
2587             val = None
2588           else:
2589             val = bool(live_data.get(instance.name))
2590         elif field == "status":
2591           if instance.primary_node in bad_nodes:
2592             val = "ERROR_nodedown"
2593           else:
2594             running = bool(live_data.get(instance.name))
2595             if running:
2596               if instance.status != "down":
2597                 val = "running"
2598               else:
2599                 val = "ERROR_up"
2600             else:
2601               if instance.status != "down":
2602                 val = "ERROR_down"
2603               else:
2604                 val = "ADMIN_down"
2605         elif field == "admin_ram":
2606           val = instance.memory
2607         elif field == "oper_ram":
2608           if instance.primary_node in bad_nodes:
2609             val = None
2610           elif instance.name in live_data:
2611             val = live_data[instance.name].get("memory", "?")
2612           else:
2613             val = "-"
2614         elif field == "disk_template":
2615           val = instance.disk_template
2616         elif field == "ip":
2617           val = instance.nics[0].ip
2618         elif field == "bridge":
2619           val = instance.nics[0].bridge
2620         elif field == "mac":
2621           val = instance.nics[0].mac
2622         elif field == "sda_size" or field == "sdb_size":
2623           disk = instance.FindDisk(field[:3])
2624           if disk is None:
2625             val = None
2626           else:
2627             val = disk.size
2628         elif field == "vcpus":
2629           val = instance.vcpus
2630         else:
2631           raise errors.ParameterError(field)
2632         iout.append(val)
2633       output.append(iout)
2634
2635     return output
2636
2637
2638 class LUFailoverInstance(LogicalUnit):
2639   """Failover an instance.
2640
2641   """
2642   HPATH = "instance-failover"
2643   HTYPE = constants.HTYPE_INSTANCE
2644   _OP_REQP = ["instance_name", "ignore_consistency"]
2645
2646   def BuildHooksEnv(self):
2647     """Build hooks env.
2648
2649     This runs on master, primary and secondary nodes of the instance.
2650
2651     """
2652     env = {
2653       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2654       }
2655     env.update(_BuildInstanceHookEnvByObject(self.instance))
2656     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2657     return env, nl, nl
2658
2659   def CheckPrereq(self):
2660     """Check prerequisites.
2661
2662     This checks that the instance is in the cluster.
2663
2664     """
2665     instance = self.cfg.GetInstanceInfo(
2666       self.cfg.ExpandInstanceName(self.op.instance_name))
2667     if instance is None:
2668       raise errors.OpPrereqError("Instance '%s' not known" %
2669                                  self.op.instance_name)
2670
2671     if instance.disk_template not in constants.DTS_NET_MIRROR:
2672       raise errors.OpPrereqError("Instance's disk layout is not"
2673                                  " network mirrored, cannot failover.")
2674
2675     secondary_nodes = instance.secondary_nodes
2676     if not secondary_nodes:
2677       raise errors.ProgrammerError("no secondary node but using "
2678                                    "DT_REMOTE_RAID1 template")
2679
2680     target_node = secondary_nodes[0]
2681     # check memory requirements on the secondary node
2682     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2683                          instance.name, instance.memory)
2684
2685     # check bridge existance
2686     brlist = [nic.bridge for nic in instance.nics]
2687     if not rpc.call_bridges_exist(target_node, brlist):
2688       raise errors.OpPrereqError("One or more target bridges %s does not"
2689                                  " exist on destination node '%s'" %
2690                                  (brlist, target_node))
2691
2692     self.instance = instance
2693
2694   def Exec(self, feedback_fn):
2695     """Failover an instance.
2696
2697     The failover is done by shutting it down on its present node and
2698     starting it on the secondary.
2699
2700     """
2701     instance = self.instance
2702
2703     source_node = instance.primary_node
2704     target_node = instance.secondary_nodes[0]
2705
2706     feedback_fn("* checking disk consistency between source and target")
2707     for dev in instance.disks:
2708       # for remote_raid1, these are md over drbd
2709       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2710         if instance.status == "up" and not self.op.ignore_consistency:
2711           raise errors.OpExecError("Disk %s is degraded on target node,"
2712                                    " aborting failover." % dev.iv_name)
2713
2714     feedback_fn("* shutting down instance on source node")
2715     logger.Info("Shutting down instance %s on node %s" %
2716                 (instance.name, source_node))
2717
2718     if not rpc.call_instance_shutdown(source_node, instance):
2719       if self.op.ignore_consistency:
2720         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2721                      " anyway. Please make sure node %s is down"  %
2722                      (instance.name, source_node, source_node))
2723       else:
2724         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2725                                  (instance.name, source_node))
2726
2727     feedback_fn("* deactivating the instance's disks on source node")
2728     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2729       raise errors.OpExecError("Can't shut down the instance's disks.")
2730
2731     instance.primary_node = target_node
2732     # distribute new instance config to the other nodes
2733     self.cfg.AddInstance(instance)
2734
2735     # Only start the instance if it's marked as up
2736     if instance.status == "up":
2737       feedback_fn("* activating the instance's disks on target node")
2738       logger.Info("Starting instance %s on node %s" %
2739                   (instance.name, target_node))
2740
2741       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2742                                                ignore_secondaries=True)
2743       if not disks_ok:
2744         _ShutdownInstanceDisks(instance, self.cfg)
2745         raise errors.OpExecError("Can't activate the instance's disks")
2746
2747       feedback_fn("* starting the instance on the target node")
2748       if not rpc.call_instance_start(target_node, instance, None):
2749         _ShutdownInstanceDisks(instance, self.cfg)
2750         raise errors.OpExecError("Could not start instance %s on node %s." %
2751                                  (instance.name, target_node))
2752
2753
2754 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2755   """Create a tree of block devices on the primary node.
2756
2757   This always creates all devices.
2758
2759   """
2760   if device.children:
2761     for child in device.children:
2762       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2763         return False
2764
2765   cfg.SetDiskID(device, node)
2766   new_id = rpc.call_blockdev_create(node, device, device.size,
2767                                     instance.name, True, info)
2768   if not new_id:
2769     return False
2770   if device.physical_id is None:
2771     device.physical_id = new_id
2772   return True
2773
2774
2775 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2776   """Create a tree of block devices on a secondary node.
2777
2778   If this device type has to be created on secondaries, create it and
2779   all its children.
2780
2781   If not, just recurse to children keeping the same 'force' value.
2782
2783   """
2784   if device.CreateOnSecondary():
2785     force = True
2786   if device.children:
2787     for child in device.children:
2788       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2789                                         child, force, info):
2790         return False
2791
2792   if not force:
2793     return True
2794   cfg.SetDiskID(device, node)
2795   new_id = rpc.call_blockdev_create(node, device, device.size,
2796                                     instance.name, False, info)
2797   if not new_id:
2798     return False
2799   if device.physical_id is None:
2800     device.physical_id = new_id
2801   return True
2802
2803
2804 def _GenerateUniqueNames(cfg, exts):
2805   """Generate a suitable LV name.
2806
2807   This will generate a logical volume name for the given instance.
2808
2809   """
2810   results = []
2811   for val in exts:
2812     new_id = cfg.GenerateUniqueID()
2813     results.append("%s%s" % (new_id, val))
2814   return results
2815
2816
2817 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2818   """Generate a drbd device complete with its children.
2819
2820   """
2821   port = cfg.AllocatePort()
2822   vgname = cfg.GetVGName()
2823   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2824                           logical_id=(vgname, names[0]))
2825   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2826                           logical_id=(vgname, names[1]))
2827   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2828                           logical_id = (primary, secondary, port),
2829                           children = [dev_data, dev_meta])
2830   return drbd_dev
2831
2832
2833 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2834   """Generate a drbd8 device complete with its children.
2835
2836   """
2837   port = cfg.AllocatePort()
2838   vgname = cfg.GetVGName()
2839   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2840                           logical_id=(vgname, names[0]))
2841   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2842                           logical_id=(vgname, names[1]))
2843   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2844                           logical_id = (primary, secondary, port),
2845                           children = [dev_data, dev_meta],
2846                           iv_name=iv_name)
2847   return drbd_dev
2848
2849 def _GenerateDiskTemplate(cfg, template_name,
2850                           instance_name, primary_node,
2851                           secondary_nodes, disk_sz, swap_sz):
2852   """Generate the entire disk layout for a given template type.
2853
2854   """
2855   #TODO: compute space requirements
2856
2857   vgname = cfg.GetVGName()
2858   if template_name == constants.DT_DISKLESS:
2859     disks = []
2860   elif template_name == constants.DT_PLAIN:
2861     if len(secondary_nodes) != 0:
2862       raise errors.ProgrammerError("Wrong template configuration")
2863
2864     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2865     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2866                            logical_id=(vgname, names[0]),
2867                            iv_name = "sda")
2868     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2869                            logical_id=(vgname, names[1]),
2870                            iv_name = "sdb")
2871     disks = [sda_dev, sdb_dev]
2872   elif template_name == constants.DT_LOCAL_RAID1:
2873     if len(secondary_nodes) != 0:
2874       raise errors.ProgrammerError("Wrong template configuration")
2875
2876
2877     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2878                                        ".sdb_m1", ".sdb_m2"])
2879     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2880                               logical_id=(vgname, names[0]))
2881     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2882                               logical_id=(vgname, names[1]))
2883     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2884                               size=disk_sz,
2885                               children = [sda_dev_m1, sda_dev_m2])
2886     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2887                               logical_id=(vgname, names[2]))
2888     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2889                               logical_id=(vgname, names[3]))
2890     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2891                               size=swap_sz,
2892                               children = [sdb_dev_m1, sdb_dev_m2])
2893     disks = [md_sda_dev, md_sdb_dev]
2894   elif template_name == constants.DT_REMOTE_RAID1:
2895     if len(secondary_nodes) != 1:
2896       raise errors.ProgrammerError("Wrong template configuration")
2897     remote_node = secondary_nodes[0]
2898     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2899                                        ".sdb_data", ".sdb_meta"])
2900     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2901                                          disk_sz, names[0:2])
2902     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2903                               children = [drbd_sda_dev], size=disk_sz)
2904     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2905                                          swap_sz, names[2:4])
2906     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2907                               children = [drbd_sdb_dev], size=swap_sz)
2908     disks = [md_sda_dev, md_sdb_dev]
2909   elif template_name == constants.DT_DRBD8:
2910     if len(secondary_nodes) != 1:
2911       raise errors.ProgrammerError("Wrong template configuration")
2912     remote_node = secondary_nodes[0]
2913     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2914                                        ".sdb_data", ".sdb_meta"])
2915     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2916                                          disk_sz, names[0:2], "sda")
2917     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2918                                          swap_sz, names[2:4], "sdb")
2919     disks = [drbd_sda_dev, drbd_sdb_dev]
2920   else:
2921     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2922   return disks
2923
2924
2925 def _GetInstanceInfoText(instance):
2926   """Compute that text that should be added to the disk's metadata.
2927
2928   """
2929   return "originstname+%s" % instance.name
2930
2931
2932 def _CreateDisks(cfg, instance):
2933   """Create all disks for an instance.
2934
2935   This abstracts away some work from AddInstance.
2936
2937   Args:
2938     instance: the instance object
2939
2940   Returns:
2941     True or False showing the success of the creation process
2942
2943   """
2944   info = _GetInstanceInfoText(instance)
2945
2946   for device in instance.disks:
2947     logger.Info("creating volume %s for instance %s" %
2948               (device.iv_name, instance.name))
2949     #HARDCODE
2950     for secondary_node in instance.secondary_nodes:
2951       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2952                                         device, False, info):
2953         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2954                      (device.iv_name, device, secondary_node))
2955         return False
2956     #HARDCODE
2957     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2958                                     instance, device, info):
2959       logger.Error("failed to create volume %s on primary!" %
2960                    device.iv_name)
2961       return False
2962   return True
2963
2964
2965 def _RemoveDisks(instance, cfg):
2966   """Remove all disks for an instance.
2967
2968   This abstracts away some work from `AddInstance()` and
2969   `RemoveInstance()`. Note that in case some of the devices couldn't
2970   be removed, the removal will continue with the other ones (compare
2971   with `_CreateDisks()`).
2972
2973   Args:
2974     instance: the instance object
2975
2976   Returns:
2977     True or False showing the success of the removal proces
2978
2979   """
2980   logger.Info("removing block devices for instance %s" % instance.name)
2981
2982   result = True
2983   for device in instance.disks:
2984     for node, disk in device.ComputeNodeTree(instance.primary_node):
2985       cfg.SetDiskID(disk, node)
2986       if not rpc.call_blockdev_remove(node, disk):
2987         logger.Error("could not remove block device %s on node %s,"
2988                      " continuing anyway" %
2989                      (device.iv_name, node))
2990         result = False
2991   return result
2992
2993
2994 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2995   """Compute disk size requirements in the volume group
2996
2997   This is currently hard-coded for the two-drive layout.
2998
2999   """
3000   # Required free disk space as a function of disk and swap space
3001   req_size_dict = {
3002     constants.DT_DISKLESS: None,
3003     constants.DT_PLAIN: disk_size + swap_size,
3004     constants.DT_LOCAL_RAID1: (disk_size + swap_size) * 2,
3005     # 256 MB are added for drbd metadata, 128MB for each drbd device
3006     constants.DT_REMOTE_RAID1: disk_size + swap_size + 256,
3007     constants.DT_DRBD8: disk_size + swap_size + 256,
3008   }
3009
3010   if disk_template not in req_size_dict:
3011     raise errors.ProgrammerError("Disk template '%s' size requirement"
3012                                  " is unknown" %  disk_template)
3013
3014   return req_size_dict[disk_template]
3015
3016
3017 class LUCreateInstance(LogicalUnit):
3018   """Create an instance.
3019
3020   """
3021   HPATH = "instance-add"
3022   HTYPE = constants.HTYPE_INSTANCE
3023   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3024               "disk_template", "swap_size", "mode", "start", "vcpus",
3025               "wait_for_sync", "ip_check", "mac"]
3026
3027   def _RunAllocator(self):
3028     """Run the allocator based on input opcode.
3029
3030     """
3031     disks = [{"size": self.op.disk_size, "mode": "w"},
3032              {"size": self.op.swap_size, "mode": "w"}]
3033     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3034              "bridge": self.op.bridge}]
3035     ial = IAllocator(self.cfg, self.sstore,
3036                      mode=constants.IALLOCATOR_MODE_ALLOC,
3037                      name=self.op.instance_name,
3038                      disk_template=self.op.disk_template,
3039                      tags=[],
3040                      os=self.op.os_type,
3041                      vcpus=self.op.vcpus,
3042                      mem_size=self.op.mem_size,
3043                      disks=disks,
3044                      nics=nics,
3045                      )
3046
3047     ial.Run(self.op.iallocator)
3048
3049     if not ial.success:
3050       raise errors.OpPrereqError("Can't compute nodes using"
3051                                  " iallocator '%s': %s" % (self.op.iallocator,
3052                                                            ial.info))
3053     if len(ial.nodes) != ial.required_nodes:
3054       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3055                                  " of nodes (%s), required %s" %
3056                                  (len(ial.nodes), ial.required_nodes))
3057     self.op.pnode = ial.nodes[0]
3058     logger.ToStdout("Selected nodes for the instance: %s" %
3059                     (", ".join(ial.nodes),))
3060     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3061                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3062     if ial.required_nodes == 2:
3063       self.op.snode = ial.nodes[1]
3064
3065   def BuildHooksEnv(self):
3066     """Build hooks env.
3067
3068     This runs on master, primary and secondary nodes of the instance.
3069
3070     """
3071     env = {
3072       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3073       "INSTANCE_DISK_SIZE": self.op.disk_size,
3074       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3075       "INSTANCE_ADD_MODE": self.op.mode,
3076       }
3077     if self.op.mode == constants.INSTANCE_IMPORT:
3078       env["INSTANCE_SRC_NODE"] = self.op.src_node
3079       env["INSTANCE_SRC_PATH"] = self.op.src_path
3080       env["INSTANCE_SRC_IMAGE"] = self.src_image
3081
3082     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3083       primary_node=self.op.pnode,
3084       secondary_nodes=self.secondaries,
3085       status=self.instance_status,
3086       os_type=self.op.os_type,
3087       memory=self.op.mem_size,
3088       vcpus=self.op.vcpus,
3089       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3090     ))
3091
3092     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3093           self.secondaries)
3094     return env, nl, nl
3095
3096
3097   def CheckPrereq(self):
3098     """Check prerequisites.
3099
3100     """
3101     # set optional parameters to none if they don't exist
3102     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3103                  "iallocator"]:
3104       if not hasattr(self.op, attr):
3105         setattr(self.op, attr, None)
3106
3107     if self.op.mode not in (constants.INSTANCE_CREATE,
3108                             constants.INSTANCE_IMPORT):
3109       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3110                                  self.op.mode)
3111
3112     if self.op.mode == constants.INSTANCE_IMPORT:
3113       src_node = getattr(self.op, "src_node", None)
3114       src_path = getattr(self.op, "src_path", None)
3115       if src_node is None or src_path is None:
3116         raise errors.OpPrereqError("Importing an instance requires source"
3117                                    " node and path options")
3118       src_node_full = self.cfg.ExpandNodeName(src_node)
3119       if src_node_full is None:
3120         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3121       self.op.src_node = src_node = src_node_full
3122
3123       if not os.path.isabs(src_path):
3124         raise errors.OpPrereqError("The source path must be absolute")
3125
3126       export_info = rpc.call_export_info(src_node, src_path)
3127
3128       if not export_info:
3129         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3130
3131       if not export_info.has_section(constants.INISECT_EXP):
3132         raise errors.ProgrammerError("Corrupted export config")
3133
3134       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3135       if (int(ei_version) != constants.EXPORT_VERSION):
3136         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3137                                    (ei_version, constants.EXPORT_VERSION))
3138
3139       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3140         raise errors.OpPrereqError("Can't import instance with more than"
3141                                    " one data disk")
3142
3143       # FIXME: are the old os-es, disk sizes, etc. useful?
3144       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3145       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3146                                                          'disk0_dump'))
3147       self.src_image = diskimage
3148     else: # INSTANCE_CREATE
3149       if getattr(self.op, "os_type", None) is None:
3150         raise errors.OpPrereqError("No guest OS specified")
3151
3152     #### instance parameters check
3153
3154     # disk template and mirror node verification
3155     if self.op.disk_template not in constants.DISK_TEMPLATES:
3156       raise errors.OpPrereqError("Invalid disk template name")
3157
3158     # instance name verification
3159     hostname1 = utils.HostInfo(self.op.instance_name)
3160
3161     self.op.instance_name = instance_name = hostname1.name
3162     instance_list = self.cfg.GetInstanceList()
3163     if instance_name in instance_list:
3164       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3165                                  instance_name)
3166
3167     # ip validity checks
3168     ip = getattr(self.op, "ip", None)
3169     if ip is None or ip.lower() == "none":
3170       inst_ip = None
3171     elif ip.lower() == "auto":
3172       inst_ip = hostname1.ip
3173     else:
3174       if not utils.IsValidIP(ip):
3175         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3176                                    " like a valid IP" % ip)
3177       inst_ip = ip
3178     self.inst_ip = self.op.ip = inst_ip
3179
3180     if self.op.start and not self.op.ip_check:
3181       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3182                                  " adding an instance in start mode")
3183
3184     if self.op.ip_check:
3185       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3186         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3187                                    (hostname1.ip, instance_name))
3188
3189     # MAC address verification
3190     if self.op.mac != "auto":
3191       if not utils.IsValidMac(self.op.mac.lower()):
3192         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3193                                    self.op.mac)
3194
3195     # bridge verification
3196     bridge = getattr(self.op, "bridge", None)
3197     if bridge is None:
3198       self.op.bridge = self.cfg.GetDefBridge()
3199     else:
3200       self.op.bridge = bridge
3201
3202     # boot order verification
3203     if self.op.hvm_boot_order is not None:
3204       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3205         raise errors.OpPrereqError("invalid boot order specified,"
3206                                    " must be one or more of [acdn]")
3207     #### allocator run
3208
3209     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3210       raise errors.OpPrereqError("One and only one of iallocator and primary"
3211                                  " node must be given")
3212
3213     if self.op.iallocator is not None:
3214       self._RunAllocator()
3215
3216     #### node related checks
3217
3218     # check primary node
3219     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3220     if pnode is None:
3221       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3222                                  self.op.pnode)
3223     self.op.pnode = pnode.name
3224     self.pnode = pnode
3225     self.secondaries = []
3226
3227     # mirror node verification
3228     if self.op.disk_template in constants.DTS_NET_MIRROR:
3229       if getattr(self.op, "snode", None) is None:
3230         raise errors.OpPrereqError("The networked disk templates need"
3231                                    " a mirror node")
3232
3233       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3234       if snode_name is None:
3235         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3236                                    self.op.snode)
3237       elif snode_name == pnode.name:
3238         raise errors.OpPrereqError("The secondary node cannot be"
3239                                    " the primary node.")
3240       self.secondaries.append(snode_name)
3241
3242     req_size = _ComputeDiskSize(self.op.disk_template,
3243                                 self.op.disk_size, self.op.swap_size)
3244
3245     # Check lv size requirements
3246     if req_size is not None:
3247       nodenames = [pnode.name] + self.secondaries
3248       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3249       for node in nodenames:
3250         info = nodeinfo.get(node, None)
3251         if not info:
3252           raise errors.OpPrereqError("Cannot get current information"
3253                                      " from node '%s'" % nodeinfo)
3254         vg_free = info.get('vg_free', None)
3255         if not isinstance(vg_free, int):
3256           raise errors.OpPrereqError("Can't compute free disk space on"
3257                                      " node %s" % node)
3258         if req_size > info['vg_free']:
3259           raise errors.OpPrereqError("Not enough disk space on target node %s."
3260                                      " %d MB available, %d MB required" %
3261                                      (node, info['vg_free'], req_size))
3262
3263     # os verification
3264     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3265     if not os_obj:
3266       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3267                                  " primary node"  % self.op.os_type)
3268
3269     if self.op.kernel_path == constants.VALUE_NONE:
3270       raise errors.OpPrereqError("Can't set instance kernel to none")
3271
3272
3273     # bridge check on primary node
3274     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3275       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3276                                  " destination node '%s'" %
3277                                  (self.op.bridge, pnode.name))
3278
3279     if self.op.start:
3280       self.instance_status = 'up'
3281     else:
3282       self.instance_status = 'down'
3283
3284   def Exec(self, feedback_fn):
3285     """Create and add the instance to the cluster.
3286
3287     """
3288     instance = self.op.instance_name
3289     pnode_name = self.pnode.name
3290
3291     if self.op.mac == "auto":
3292       mac_address = self.cfg.GenerateMAC()
3293     else:
3294       mac_address = self.op.mac
3295
3296     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3297     if self.inst_ip is not None:
3298       nic.ip = self.inst_ip
3299
3300     ht_kind = self.sstore.GetHypervisorType()
3301     if ht_kind in constants.HTS_REQ_PORT:
3302       network_port = self.cfg.AllocatePort()
3303     else:
3304       network_port = None
3305
3306     disks = _GenerateDiskTemplate(self.cfg,
3307                                   self.op.disk_template,
3308                                   instance, pnode_name,
3309                                   self.secondaries, self.op.disk_size,
3310                                   self.op.swap_size)
3311
3312     iobj = objects.Instance(name=instance, os=self.op.os_type,
3313                             primary_node=pnode_name,
3314                             memory=self.op.mem_size,
3315                             vcpus=self.op.vcpus,
3316                             nics=[nic], disks=disks,
3317                             disk_template=self.op.disk_template,
3318                             status=self.instance_status,
3319                             network_port=network_port,
3320                             kernel_path=self.op.kernel_path,
3321                             initrd_path=self.op.initrd_path,
3322                             hvm_boot_order=self.op.hvm_boot_order,
3323                             )
3324
3325     feedback_fn("* creating instance disks...")
3326     if not _CreateDisks(self.cfg, iobj):
3327       _RemoveDisks(iobj, self.cfg)
3328       raise errors.OpExecError("Device creation failed, reverting...")
3329
3330     feedback_fn("adding instance %s to cluster config" % instance)
3331
3332     self.cfg.AddInstance(iobj)
3333
3334     if self.op.wait_for_sync:
3335       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3336     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3337       # make sure the disks are not degraded (still sync-ing is ok)
3338       time.sleep(15)
3339       feedback_fn("* checking mirrors status")
3340       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3341     else:
3342       disk_abort = False
3343
3344     if disk_abort:
3345       _RemoveDisks(iobj, self.cfg)
3346       self.cfg.RemoveInstance(iobj.name)
3347       raise errors.OpExecError("There are some degraded disks for"
3348                                " this instance")
3349
3350     feedback_fn("creating os for instance %s on node %s" %
3351                 (instance, pnode_name))
3352
3353     if iobj.disk_template != constants.DT_DISKLESS:
3354       if self.op.mode == constants.INSTANCE_CREATE:
3355         feedback_fn("* running the instance OS create scripts...")
3356         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3357           raise errors.OpExecError("could not add os for instance %s"
3358                                    " on node %s" %
3359                                    (instance, pnode_name))
3360
3361       elif self.op.mode == constants.INSTANCE_IMPORT:
3362         feedback_fn("* running the instance OS import scripts...")
3363         src_node = self.op.src_node
3364         src_image = self.src_image
3365         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3366                                                 src_node, src_image):
3367           raise errors.OpExecError("Could not import os for instance"
3368                                    " %s on node %s" %
3369                                    (instance, pnode_name))
3370       else:
3371         # also checked in the prereq part
3372         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3373                                      % self.op.mode)
3374
3375     if self.op.start:
3376       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3377       feedback_fn("* starting instance...")
3378       if not rpc.call_instance_start(pnode_name, iobj, None):
3379         raise errors.OpExecError("Could not start instance")
3380
3381
3382 class LUConnectConsole(NoHooksLU):
3383   """Connect to an instance's console.
3384
3385   This is somewhat special in that it returns the command line that
3386   you need to run on the master node in order to connect to the
3387   console.
3388
3389   """
3390   _OP_REQP = ["instance_name"]
3391
3392   def CheckPrereq(self):
3393     """Check prerequisites.
3394
3395     This checks that the instance is in the cluster.
3396
3397     """
3398     instance = self.cfg.GetInstanceInfo(
3399       self.cfg.ExpandInstanceName(self.op.instance_name))
3400     if instance is None:
3401       raise errors.OpPrereqError("Instance '%s' not known" %
3402                                  self.op.instance_name)
3403     self.instance = instance
3404
3405   def Exec(self, feedback_fn):
3406     """Connect to the console of an instance
3407
3408     """
3409     instance = self.instance
3410     node = instance.primary_node
3411
3412     node_insts = rpc.call_instance_list([node])[node]
3413     if node_insts is False:
3414       raise errors.OpExecError("Can't connect to node %s." % node)
3415
3416     if instance.name not in node_insts:
3417       raise errors.OpExecError("Instance %s is not running." % instance.name)
3418
3419     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3420
3421     hyper = hypervisor.GetHypervisor()
3422     console_cmd = hyper.GetShellCommandForConsole(instance)
3423     # build ssh cmdline
3424     argv = ["ssh", "-q", "-t"]
3425     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3426     argv.extend(ssh.BATCH_MODE_OPTS)
3427     argv.append(node)
3428     argv.append(console_cmd)
3429     return "ssh", argv
3430
3431
3432 class LUAddMDDRBDComponent(LogicalUnit):
3433   """Adda new mirror member to an instance's disk.
3434
3435   """
3436   HPATH = "mirror-add"
3437   HTYPE = constants.HTYPE_INSTANCE
3438   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3439
3440   def BuildHooksEnv(self):
3441     """Build hooks env.
3442
3443     This runs on the master, the primary and all the secondaries.
3444
3445     """
3446     env = {
3447       "NEW_SECONDARY": self.op.remote_node,
3448       "DISK_NAME": self.op.disk_name,
3449       }
3450     env.update(_BuildInstanceHookEnvByObject(self.instance))
3451     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3452           self.op.remote_node,] + list(self.instance.secondary_nodes)
3453     return env, nl, nl
3454
3455   def CheckPrereq(self):
3456     """Check prerequisites.
3457
3458     This checks that the instance is in the cluster.
3459
3460     """
3461     instance = self.cfg.GetInstanceInfo(
3462       self.cfg.ExpandInstanceName(self.op.instance_name))
3463     if instance is None:
3464       raise errors.OpPrereqError("Instance '%s' not known" %
3465                                  self.op.instance_name)
3466     self.instance = instance
3467
3468     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3469     if remote_node is None:
3470       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3471     self.remote_node = remote_node
3472
3473     if remote_node == instance.primary_node:
3474       raise errors.OpPrereqError("The specified node is the primary node of"
3475                                  " the instance.")
3476
3477     if instance.disk_template != constants.DT_REMOTE_RAID1:
3478       raise errors.OpPrereqError("Instance's disk layout is not"
3479                                  " remote_raid1.")
3480     for disk in instance.disks:
3481       if disk.iv_name == self.op.disk_name:
3482         break
3483     else:
3484       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3485                                  " instance." % self.op.disk_name)
3486     if len(disk.children) > 1:
3487       raise errors.OpPrereqError("The device already has two slave devices."
3488                                  " This would create a 3-disk raid1 which we"
3489                                  " don't allow.")
3490     self.disk = disk
3491
3492   def Exec(self, feedback_fn):
3493     """Add the mirror component
3494
3495     """
3496     disk = self.disk
3497     instance = self.instance
3498
3499     remote_node = self.remote_node
3500     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3501     names = _GenerateUniqueNames(self.cfg, lv_names)
3502     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3503                                      remote_node, disk.size, names)
3504
3505     logger.Info("adding new mirror component on secondary")
3506     #HARDCODE
3507     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3508                                       new_drbd, False,
3509                                       _GetInstanceInfoText(instance)):
3510       raise errors.OpExecError("Failed to create new component on secondary"
3511                                " node %s" % remote_node)
3512
3513     logger.Info("adding new mirror component on primary")
3514     #HARDCODE
3515     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3516                                     instance, new_drbd,
3517                                     _GetInstanceInfoText(instance)):
3518       # remove secondary dev
3519       self.cfg.SetDiskID(new_drbd, remote_node)
3520       rpc.call_blockdev_remove(remote_node, new_drbd)
3521       raise errors.OpExecError("Failed to create volume on primary")
3522
3523     # the device exists now
3524     # call the primary node to add the mirror to md
3525     logger.Info("adding new mirror component to md")
3526     if not rpc.call_blockdev_addchildren(instance.primary_node,
3527                                          disk, [new_drbd]):
3528       logger.Error("Can't add mirror compoment to md!")
3529       self.cfg.SetDiskID(new_drbd, remote_node)
3530       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3531         logger.Error("Can't rollback on secondary")
3532       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3533       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3534         logger.Error("Can't rollback on primary")
3535       raise errors.OpExecError("Can't add mirror component to md array")
3536
3537     disk.children.append(new_drbd)
3538
3539     self.cfg.AddInstance(instance)
3540
3541     _WaitForSync(self.cfg, instance, self.proc)
3542
3543     return 0
3544
3545
3546 class LURemoveMDDRBDComponent(LogicalUnit):
3547   """Remove a component from a remote_raid1 disk.
3548
3549   """
3550   HPATH = "mirror-remove"
3551   HTYPE = constants.HTYPE_INSTANCE
3552   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3553
3554   def BuildHooksEnv(self):
3555     """Build hooks env.
3556
3557     This runs on the master, the primary and all the secondaries.
3558
3559     """
3560     env = {
3561       "DISK_NAME": self.op.disk_name,
3562       "DISK_ID": self.op.disk_id,
3563       "OLD_SECONDARY": self.old_secondary,
3564       }
3565     env.update(_BuildInstanceHookEnvByObject(self.instance))
3566     nl = [self.sstore.GetMasterNode(),
3567           self.instance.primary_node] + list(self.instance.secondary_nodes)
3568     return env, nl, nl
3569
3570   def CheckPrereq(self):
3571     """Check prerequisites.
3572
3573     This checks that the instance is in the cluster.
3574
3575     """
3576     instance = self.cfg.GetInstanceInfo(
3577       self.cfg.ExpandInstanceName(self.op.instance_name))
3578     if instance is None:
3579       raise errors.OpPrereqError("Instance '%s' not known" %
3580                                  self.op.instance_name)
3581     self.instance = instance
3582
3583     if instance.disk_template != constants.DT_REMOTE_RAID1:
3584       raise errors.OpPrereqError("Instance's disk layout is not"
3585                                  " remote_raid1.")
3586     for disk in instance.disks:
3587       if disk.iv_name == self.op.disk_name:
3588         break
3589     else:
3590       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3591                                  " instance." % self.op.disk_name)
3592     for child in disk.children:
3593       if (child.dev_type == constants.LD_DRBD7 and
3594           child.logical_id[2] == self.op.disk_id):
3595         break
3596     else:
3597       raise errors.OpPrereqError("Can't find the device with this port.")
3598
3599     if len(disk.children) < 2:
3600       raise errors.OpPrereqError("Cannot remove the last component from"
3601                                  " a mirror.")
3602     self.disk = disk
3603     self.child = child
3604     if self.child.logical_id[0] == instance.primary_node:
3605       oid = 1
3606     else:
3607       oid = 0
3608     self.old_secondary = self.child.logical_id[oid]
3609
3610   def Exec(self, feedback_fn):
3611     """Remove the mirror component
3612
3613     """
3614     instance = self.instance
3615     disk = self.disk
3616     child = self.child
3617     logger.Info("remove mirror component")
3618     self.cfg.SetDiskID(disk, instance.primary_node)
3619     if not rpc.call_blockdev_removechildren(instance.primary_node,
3620                                             disk, [child]):
3621       raise errors.OpExecError("Can't remove child from mirror.")
3622
3623     for node in child.logical_id[:2]:
3624       self.cfg.SetDiskID(child, node)
3625       if not rpc.call_blockdev_remove(node, child):
3626         logger.Error("Warning: failed to remove device from node %s,"
3627                      " continuing operation." % node)
3628
3629     disk.children.remove(child)
3630     self.cfg.AddInstance(instance)
3631
3632
3633 class LUReplaceDisks(LogicalUnit):
3634   """Replace the disks of an instance.
3635
3636   """
3637   HPATH = "mirrors-replace"
3638   HTYPE = constants.HTYPE_INSTANCE
3639   _OP_REQP = ["instance_name", "mode", "disks"]
3640
3641   def _RunAllocator(self):
3642     """Compute a new secondary node using an IAllocator.
3643
3644     """
3645     ial = IAllocator(self.cfg, self.sstore,
3646                      mode=constants.IALLOCATOR_MODE_RELOC,
3647                      name=self.op.instance_name,
3648                      relocate_from=[self.sec_node])
3649
3650     ial.Run(self.op.iallocator)
3651
3652     if not ial.success:
3653       raise errors.OpPrereqError("Can't compute nodes using"
3654                                  " iallocator '%s': %s" % (self.op.iallocator,
3655                                                            ial.info))
3656     if len(ial.nodes) != ial.required_nodes:
3657       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3658                                  " of nodes (%s), required %s" %
3659                                  (len(ial.nodes), ial.required_nodes))
3660     self.op.remote_node = ial.nodes[0]
3661     logger.ToStdout("Selected new secondary for the instance: %s" %
3662                     self.op.remote_node)
3663
3664   def BuildHooksEnv(self):
3665     """Build hooks env.
3666
3667     This runs on the master, the primary and all the secondaries.
3668
3669     """
3670     env = {
3671       "MODE": self.op.mode,
3672       "NEW_SECONDARY": self.op.remote_node,
3673       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3674       }
3675     env.update(_BuildInstanceHookEnvByObject(self.instance))
3676     nl = [
3677       self.sstore.GetMasterNode(),
3678       self.instance.primary_node,
3679       ]
3680     if self.op.remote_node is not None:
3681       nl.append(self.op.remote_node)
3682     return env, nl, nl
3683
3684   def CheckPrereq(self):
3685     """Check prerequisites.
3686
3687     This checks that the instance is in the cluster.
3688
3689     """
3690     if not hasattr(self.op, "remote_node"):
3691       self.op.remote_node = None
3692
3693     instance = self.cfg.GetInstanceInfo(
3694       self.cfg.ExpandInstanceName(self.op.instance_name))
3695     if instance is None:
3696       raise errors.OpPrereqError("Instance '%s' not known" %
3697                                  self.op.instance_name)
3698     self.instance = instance
3699     self.op.instance_name = instance.name
3700
3701     if instance.disk_template not in constants.DTS_NET_MIRROR:
3702       raise errors.OpPrereqError("Instance's disk layout is not"
3703                                  " network mirrored.")
3704
3705     if len(instance.secondary_nodes) != 1:
3706       raise errors.OpPrereqError("The instance has a strange layout,"
3707                                  " expected one secondary but found %d" %
3708                                  len(instance.secondary_nodes))
3709
3710     self.sec_node = instance.secondary_nodes[0]
3711
3712     ia_name = getattr(self.op, "iallocator", None)
3713     if ia_name is not None:
3714       if self.op.remote_node is not None:
3715         raise errors.OpPrereqError("Give either the iallocator or the new"
3716                                    " secondary, not both")
3717       self.op.remote_node = self._RunAllocator()
3718
3719     remote_node = self.op.remote_node
3720     if remote_node is not None:
3721       remote_node = self.cfg.ExpandNodeName(remote_node)
3722       if remote_node is None:
3723         raise errors.OpPrereqError("Node '%s' not known" %
3724                                    self.op.remote_node)
3725       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3726     else:
3727       self.remote_node_info = None
3728     if remote_node == instance.primary_node:
3729       raise errors.OpPrereqError("The specified node is the primary node of"
3730                                  " the instance.")
3731     elif remote_node == self.sec_node:
3732       if self.op.mode == constants.REPLACE_DISK_SEC:
3733         # this is for DRBD8, where we can't execute the same mode of
3734         # replacement as for drbd7 (no different port allocated)
3735         raise errors.OpPrereqError("Same secondary given, cannot execute"
3736                                    " replacement")
3737       # the user gave the current secondary, switch to
3738       # 'no-replace-secondary' mode for drbd7
3739       remote_node = None
3740     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3741         self.op.mode != constants.REPLACE_DISK_ALL):
3742       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3743                                  " disks replacement, not individual ones")
3744     if instance.disk_template == constants.DT_DRBD8:
3745       if (self.op.mode == constants.REPLACE_DISK_ALL and
3746           remote_node is not None):
3747         # switch to replace secondary mode
3748         self.op.mode = constants.REPLACE_DISK_SEC
3749
3750       if self.op.mode == constants.REPLACE_DISK_ALL:
3751         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3752                                    " secondary disk replacement, not"
3753                                    " both at once")
3754       elif self.op.mode == constants.REPLACE_DISK_PRI:
3755         if remote_node is not None:
3756           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3757                                      " the secondary while doing a primary"
3758                                      " node disk replacement")
3759         self.tgt_node = instance.primary_node
3760         self.oth_node = instance.secondary_nodes[0]
3761       elif self.op.mode == constants.REPLACE_DISK_SEC:
3762         self.new_node = remote_node # this can be None, in which case
3763                                     # we don't change the secondary
3764         self.tgt_node = instance.secondary_nodes[0]
3765         self.oth_node = instance.primary_node
3766       else:
3767         raise errors.ProgrammerError("Unhandled disk replace mode")
3768
3769     for name in self.op.disks:
3770       if instance.FindDisk(name) is None:
3771         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3772                                    (name, instance.name))
3773     self.op.remote_node = remote_node
3774
3775   def _ExecRR1(self, feedback_fn):
3776     """Replace the disks of an instance.
3777
3778     """
3779     instance = self.instance
3780     iv_names = {}
3781     # start of work
3782     if self.op.remote_node is None:
3783       remote_node = self.sec_node
3784     else:
3785       remote_node = self.op.remote_node
3786     cfg = self.cfg
3787     for dev in instance.disks:
3788       size = dev.size
3789       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3790       names = _GenerateUniqueNames(cfg, lv_names)
3791       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3792                                        remote_node, size, names)
3793       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3794       logger.Info("adding new mirror component on secondary for %s" %
3795                   dev.iv_name)
3796       #HARDCODE
3797       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3798                                         new_drbd, False,
3799                                         _GetInstanceInfoText(instance)):
3800         raise errors.OpExecError("Failed to create new component on secondary"
3801                                  " node %s. Full abort, cleanup manually!" %
3802                                  remote_node)
3803
3804       logger.Info("adding new mirror component on primary")
3805       #HARDCODE
3806       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3807                                       instance, new_drbd,
3808                                       _GetInstanceInfoText(instance)):
3809         # remove secondary dev
3810         cfg.SetDiskID(new_drbd, remote_node)
3811         rpc.call_blockdev_remove(remote_node, new_drbd)
3812         raise errors.OpExecError("Failed to create volume on primary!"
3813                                  " Full abort, cleanup manually!!")
3814
3815       # the device exists now
3816       # call the primary node to add the mirror to md
3817       logger.Info("adding new mirror component to md")
3818       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3819                                            [new_drbd]):
3820         logger.Error("Can't add mirror compoment to md!")
3821         cfg.SetDiskID(new_drbd, remote_node)
3822         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3823           logger.Error("Can't rollback on secondary")
3824         cfg.SetDiskID(new_drbd, instance.primary_node)
3825         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3826           logger.Error("Can't rollback on primary")
3827         raise errors.OpExecError("Full abort, cleanup manually!!")
3828
3829       dev.children.append(new_drbd)
3830       cfg.AddInstance(instance)
3831
3832     # this can fail as the old devices are degraded and _WaitForSync
3833     # does a combined result over all disks, so we don't check its
3834     # return value
3835     _WaitForSync(cfg, instance, self.proc, unlock=True)
3836
3837     # so check manually all the devices
3838     for name in iv_names:
3839       dev, child, new_drbd = iv_names[name]
3840       cfg.SetDiskID(dev, instance.primary_node)
3841       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3842       if is_degr:
3843         raise errors.OpExecError("MD device %s is degraded!" % name)
3844       cfg.SetDiskID(new_drbd, instance.primary_node)
3845       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3846       if is_degr:
3847         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3848
3849     for name in iv_names:
3850       dev, child, new_drbd = iv_names[name]
3851       logger.Info("remove mirror %s component" % name)
3852       cfg.SetDiskID(dev, instance.primary_node)
3853       if not rpc.call_blockdev_removechildren(instance.primary_node,
3854                                               dev, [child]):
3855         logger.Error("Can't remove child from mirror, aborting"
3856                      " *this device cleanup*.\nYou need to cleanup manually!!")
3857         continue
3858
3859       for node in child.logical_id[:2]:
3860         logger.Info("remove child device on %s" % node)
3861         cfg.SetDiskID(child, node)
3862         if not rpc.call_blockdev_remove(node, child):
3863           logger.Error("Warning: failed to remove device from node %s,"
3864                        " continuing operation." % node)
3865
3866       dev.children.remove(child)
3867
3868       cfg.AddInstance(instance)
3869
3870   def _ExecD8DiskOnly(self, feedback_fn):
3871     """Replace a disk on the primary or secondary for dbrd8.
3872
3873     The algorithm for replace is quite complicated:
3874       - for each disk to be replaced:
3875         - create new LVs on the target node with unique names
3876         - detach old LVs from the drbd device
3877         - rename old LVs to name_replaced.<time_t>
3878         - rename new LVs to old LVs
3879         - attach the new LVs (with the old names now) to the drbd device
3880       - wait for sync across all devices
3881       - for each modified disk:
3882         - remove old LVs (which have the name name_replaces.<time_t>)
3883
3884     Failures are not very well handled.
3885
3886     """
3887     steps_total = 6
3888     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3889     instance = self.instance
3890     iv_names = {}
3891     vgname = self.cfg.GetVGName()
3892     # start of work
3893     cfg = self.cfg
3894     tgt_node = self.tgt_node
3895     oth_node = self.oth_node
3896
3897     # Step: check device activation
3898     self.proc.LogStep(1, steps_total, "check device existence")
3899     info("checking volume groups")
3900     my_vg = cfg.GetVGName()
3901     results = rpc.call_vg_list([oth_node, tgt_node])
3902     if not results:
3903       raise errors.OpExecError("Can't list volume groups on the nodes")
3904     for node in oth_node, tgt_node:
3905       res = results.get(node, False)
3906       if not res or my_vg not in res:
3907         raise errors.OpExecError("Volume group '%s' not found on %s" %
3908                                  (my_vg, node))
3909     for dev in instance.disks:
3910       if not dev.iv_name in self.op.disks:
3911         continue
3912       for node in tgt_node, oth_node:
3913         info("checking %s on %s" % (dev.iv_name, node))
3914         cfg.SetDiskID(dev, node)
3915         if not rpc.call_blockdev_find(node, dev):
3916           raise errors.OpExecError("Can't find device %s on node %s" %
3917                                    (dev.iv_name, node))
3918
3919     # Step: check other node consistency
3920     self.proc.LogStep(2, steps_total, "check peer consistency")
3921     for dev in instance.disks:
3922       if not dev.iv_name in self.op.disks:
3923         continue
3924       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3925       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3926                                    oth_node==instance.primary_node):
3927         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3928                                  " to replace disks on this node (%s)" %
3929                                  (oth_node, tgt_node))
3930
3931     # Step: create new storage
3932     self.proc.LogStep(3, steps_total, "allocate new storage")
3933     for dev in instance.disks:
3934       if not dev.iv_name in self.op.disks:
3935         continue
3936       size = dev.size
3937       cfg.SetDiskID(dev, tgt_node)
3938       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3939       names = _GenerateUniqueNames(cfg, lv_names)
3940       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3941                              logical_id=(vgname, names[0]))
3942       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3943                              logical_id=(vgname, names[1]))
3944       new_lvs = [lv_data, lv_meta]
3945       old_lvs = dev.children
3946       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3947       info("creating new local storage on %s for %s" %
3948            (tgt_node, dev.iv_name))
3949       # since we *always* want to create this LV, we use the
3950       # _Create...OnPrimary (which forces the creation), even if we
3951       # are talking about the secondary node
3952       for new_lv in new_lvs:
3953         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3954                                         _GetInstanceInfoText(instance)):
3955           raise errors.OpExecError("Failed to create new LV named '%s' on"
3956                                    " node '%s'" %
3957                                    (new_lv.logical_id[1], tgt_node))
3958
3959     # Step: for each lv, detach+rename*2+attach
3960     self.proc.LogStep(4, steps_total, "change drbd configuration")
3961     for dev, old_lvs, new_lvs in iv_names.itervalues():
3962       info("detaching %s drbd from local storage" % dev.iv_name)
3963       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3964         raise errors.OpExecError("Can't detach drbd from local storage on node"
3965                                  " %s for device %s" % (tgt_node, dev.iv_name))
3966       #dev.children = []
3967       #cfg.Update(instance)
3968
3969       # ok, we created the new LVs, so now we know we have the needed
3970       # storage; as such, we proceed on the target node to rename
3971       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3972       # using the assumption that logical_id == physical_id (which in
3973       # turn is the unique_id on that node)
3974
3975       # FIXME(iustin): use a better name for the replaced LVs
3976       temp_suffix = int(time.time())
3977       ren_fn = lambda d, suff: (d.physical_id[0],
3978                                 d.physical_id[1] + "_replaced-%s" % suff)
3979       # build the rename list based on what LVs exist on the node
3980       rlist = []
3981       for to_ren in old_lvs:
3982         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3983         if find_res is not None: # device exists
3984           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3985
3986       info("renaming the old LVs on the target node")
3987       if not rpc.call_blockdev_rename(tgt_node, rlist):
3988         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3989       # now we rename the new LVs to the old LVs
3990       info("renaming the new LVs on the target node")
3991       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3992       if not rpc.call_blockdev_rename(tgt_node, rlist):
3993         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3994
3995       for old, new in zip(old_lvs, new_lvs):
3996         new.logical_id = old.logical_id
3997         cfg.SetDiskID(new, tgt_node)
3998
3999       for disk in old_lvs:
4000         disk.logical_id = ren_fn(disk, temp_suffix)
4001         cfg.SetDiskID(disk, tgt_node)
4002
4003       # now that the new lvs have the old name, we can add them to the device
4004       info("adding new mirror component on %s" % tgt_node)
4005       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4006         for new_lv in new_lvs:
4007           if not rpc.call_blockdev_remove(tgt_node, new_lv):
4008             warning("Can't rollback device %s", hint="manually cleanup unused"
4009                     " logical volumes")
4010         raise errors.OpExecError("Can't add local storage to drbd")
4011
4012       dev.children = new_lvs
4013       cfg.Update(instance)
4014
4015     # Step: wait for sync
4016
4017     # this can fail as the old devices are degraded and _WaitForSync
4018     # does a combined result over all disks, so we don't check its
4019     # return value
4020     self.proc.LogStep(5, steps_total, "sync devices")
4021     _WaitForSync(cfg, instance, self.proc, unlock=True)
4022
4023     # so check manually all the devices
4024     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4025       cfg.SetDiskID(dev, instance.primary_node)
4026       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4027       if is_degr:
4028         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4029
4030     # Step: remove old storage
4031     self.proc.LogStep(6, steps_total, "removing old storage")
4032     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4033       info("remove logical volumes for %s" % name)
4034       for lv in old_lvs:
4035         cfg.SetDiskID(lv, tgt_node)
4036         if not rpc.call_blockdev_remove(tgt_node, lv):
4037           warning("Can't remove old LV", hint="manually remove unused LVs")
4038           continue
4039
4040   def _ExecD8Secondary(self, feedback_fn):
4041     """Replace the secondary node for drbd8.
4042
4043     The algorithm for replace is quite complicated:
4044       - for all disks of the instance:
4045         - create new LVs on the new node with same names
4046         - shutdown the drbd device on the old secondary
4047         - disconnect the drbd network on the primary
4048         - create the drbd device on the new secondary
4049         - network attach the drbd on the primary, using an artifice:
4050           the drbd code for Attach() will connect to the network if it
4051           finds a device which is connected to the good local disks but
4052           not network enabled
4053       - wait for sync across all devices
4054       - remove all disks from the old secondary
4055
4056     Failures are not very well handled.
4057
4058     """
4059     steps_total = 6
4060     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4061     instance = self.instance
4062     iv_names = {}
4063     vgname = self.cfg.GetVGName()
4064     # start of work
4065     cfg = self.cfg
4066     old_node = self.tgt_node
4067     new_node = self.new_node
4068     pri_node = instance.primary_node
4069
4070     # Step: check device activation
4071     self.proc.LogStep(1, steps_total, "check device existence")
4072     info("checking volume groups")
4073     my_vg = cfg.GetVGName()
4074     results = rpc.call_vg_list([pri_node, new_node])
4075     if not results:
4076       raise errors.OpExecError("Can't list volume groups on the nodes")
4077     for node in pri_node, new_node:
4078       res = results.get(node, False)
4079       if not res or my_vg not in res:
4080         raise errors.OpExecError("Volume group '%s' not found on %s" %
4081                                  (my_vg, node))
4082     for dev in instance.disks:
4083       if not dev.iv_name in self.op.disks:
4084         continue
4085       info("checking %s on %s" % (dev.iv_name, pri_node))
4086       cfg.SetDiskID(dev, pri_node)
4087       if not rpc.call_blockdev_find(pri_node, dev):
4088         raise errors.OpExecError("Can't find device %s on node %s" %
4089                                  (dev.iv_name, pri_node))
4090
4091     # Step: check other node consistency
4092     self.proc.LogStep(2, steps_total, "check peer consistency")
4093     for dev in instance.disks:
4094       if not dev.iv_name in self.op.disks:
4095         continue
4096       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4097       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4098         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4099                                  " unsafe to replace the secondary" %
4100                                  pri_node)
4101
4102     # Step: create new storage
4103     self.proc.LogStep(3, steps_total, "allocate new storage")
4104     for dev in instance.disks:
4105       size = dev.size
4106       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4107       # since we *always* want to create this LV, we use the
4108       # _Create...OnPrimary (which forces the creation), even if we
4109       # are talking about the secondary node
4110       for new_lv in dev.children:
4111         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4112                                         _GetInstanceInfoText(instance)):
4113           raise errors.OpExecError("Failed to create new LV named '%s' on"
4114                                    " node '%s'" %
4115                                    (new_lv.logical_id[1], new_node))
4116
4117       iv_names[dev.iv_name] = (dev, dev.children)
4118
4119     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4120     for dev in instance.disks:
4121       size = dev.size
4122       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4123       # create new devices on new_node
4124       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4125                               logical_id=(pri_node, new_node,
4126                                           dev.logical_id[2]),
4127                               children=dev.children)
4128       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4129                                         new_drbd, False,
4130                                       _GetInstanceInfoText(instance)):
4131         raise errors.OpExecError("Failed to create new DRBD on"
4132                                  " node '%s'" % new_node)
4133
4134     for dev in instance.disks:
4135       # we have new devices, shutdown the drbd on the old secondary
4136       info("shutting down drbd for %s on old node" % dev.iv_name)
4137       cfg.SetDiskID(dev, old_node)
4138       if not rpc.call_blockdev_shutdown(old_node, dev):
4139         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4140                 hint="Please cleanup this device manually as soon as possible")
4141
4142     info("detaching primary drbds from the network (=> standalone)")
4143     done = 0
4144     for dev in instance.disks:
4145       cfg.SetDiskID(dev, pri_node)
4146       # set the physical (unique in bdev terms) id to None, meaning
4147       # detach from network
4148       dev.physical_id = (None,) * len(dev.physical_id)
4149       # and 'find' the device, which will 'fix' it to match the
4150       # standalone state
4151       if rpc.call_blockdev_find(pri_node, dev):
4152         done += 1
4153       else:
4154         warning("Failed to detach drbd %s from network, unusual case" %
4155                 dev.iv_name)
4156
4157     if not done:
4158       # no detaches succeeded (very unlikely)
4159       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4160
4161     # if we managed to detach at least one, we update all the disks of
4162     # the instance to point to the new secondary
4163     info("updating instance configuration")
4164     for dev in instance.disks:
4165       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4166       cfg.SetDiskID(dev, pri_node)
4167     cfg.Update(instance)
4168
4169     # and now perform the drbd attach
4170     info("attaching primary drbds to new secondary (standalone => connected)")
4171     failures = []
4172     for dev in instance.disks:
4173       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4174       # since the attach is smart, it's enough to 'find' the device,
4175       # it will automatically activate the network, if the physical_id
4176       # is correct
4177       cfg.SetDiskID(dev, pri_node)
4178       if not rpc.call_blockdev_find(pri_node, dev):
4179         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4180                 "please do a gnt-instance info to see the status of disks")
4181
4182     # this can fail as the old devices are degraded and _WaitForSync
4183     # does a combined result over all disks, so we don't check its
4184     # return value
4185     self.proc.LogStep(5, steps_total, "sync devices")
4186     _WaitForSync(cfg, instance, self.proc, unlock=True)
4187
4188     # so check manually all the devices
4189     for name, (dev, old_lvs) in iv_names.iteritems():
4190       cfg.SetDiskID(dev, pri_node)
4191       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4192       if is_degr:
4193         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4194
4195     self.proc.LogStep(6, steps_total, "removing old storage")
4196     for name, (dev, old_lvs) in iv_names.iteritems():
4197       info("remove logical volumes for %s" % name)
4198       for lv in old_lvs:
4199         cfg.SetDiskID(lv, old_node)
4200         if not rpc.call_blockdev_remove(old_node, lv):
4201           warning("Can't remove LV on old secondary",
4202                   hint="Cleanup stale volumes by hand")
4203
4204   def Exec(self, feedback_fn):
4205     """Execute disk replacement.
4206
4207     This dispatches the disk replacement to the appropriate handler.
4208
4209     """
4210     instance = self.instance
4211     if instance.disk_template == constants.DT_REMOTE_RAID1:
4212       fn = self._ExecRR1
4213     elif instance.disk_template == constants.DT_DRBD8:
4214       if self.op.remote_node is None:
4215         fn = self._ExecD8DiskOnly
4216       else:
4217         fn = self._ExecD8Secondary
4218     else:
4219       raise errors.ProgrammerError("Unhandled disk replacement case")
4220     return fn(feedback_fn)
4221
4222
4223 class LUQueryInstanceData(NoHooksLU):
4224   """Query runtime instance data.
4225
4226   """
4227   _OP_REQP = ["instances"]
4228
4229   def CheckPrereq(self):
4230     """Check prerequisites.
4231
4232     This only checks the optional instance list against the existing names.
4233
4234     """
4235     if not isinstance(self.op.instances, list):
4236       raise errors.OpPrereqError("Invalid argument type 'instances'")
4237     if self.op.instances:
4238       self.wanted_instances = []
4239       names = self.op.instances
4240       for name in names:
4241         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4242         if instance is None:
4243           raise errors.OpPrereqError("No such instance name '%s'" % name)
4244         self.wanted_instances.append(instance)
4245     else:
4246       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4247                                in self.cfg.GetInstanceList()]
4248     return
4249
4250
4251   def _ComputeDiskStatus(self, instance, snode, dev):
4252     """Compute block device status.
4253
4254     """
4255     self.cfg.SetDiskID(dev, instance.primary_node)
4256     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4257     if dev.dev_type in constants.LDS_DRBD:
4258       # we change the snode then (otherwise we use the one passed in)
4259       if dev.logical_id[0] == instance.primary_node:
4260         snode = dev.logical_id[1]
4261       else:
4262         snode = dev.logical_id[0]
4263
4264     if snode:
4265       self.cfg.SetDiskID(dev, snode)
4266       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4267     else:
4268       dev_sstatus = None
4269
4270     if dev.children:
4271       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4272                       for child in dev.children]
4273     else:
4274       dev_children = []
4275
4276     data = {
4277       "iv_name": dev.iv_name,
4278       "dev_type": dev.dev_type,
4279       "logical_id": dev.logical_id,
4280       "physical_id": dev.physical_id,
4281       "pstatus": dev_pstatus,
4282       "sstatus": dev_sstatus,
4283       "children": dev_children,
4284       }
4285
4286     return data
4287
4288   def Exec(self, feedback_fn):
4289     """Gather and return data"""
4290     result = {}
4291     for instance in self.wanted_instances:
4292       remote_info = rpc.call_instance_info(instance.primary_node,
4293                                                 instance.name)
4294       if remote_info and "state" in remote_info:
4295         remote_state = "up"
4296       else:
4297         remote_state = "down"
4298       if instance.status == "down":
4299         config_state = "down"
4300       else:
4301         config_state = "up"
4302
4303       disks = [self._ComputeDiskStatus(instance, None, device)
4304                for device in instance.disks]
4305
4306       idict = {
4307         "name": instance.name,
4308         "config_state": config_state,
4309         "run_state": remote_state,
4310         "pnode": instance.primary_node,
4311         "snodes": instance.secondary_nodes,
4312         "os": instance.os,
4313         "memory": instance.memory,
4314         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4315         "disks": disks,
4316         "network_port": instance.network_port,
4317         "vcpus": instance.vcpus,
4318         "kernel_path": instance.kernel_path,
4319         "initrd_path": instance.initrd_path,
4320         "hvm_boot_order": instance.hvm_boot_order,
4321         }
4322
4323       result[instance.name] = idict
4324
4325     return result
4326
4327
4328 class LUSetInstanceParms(LogicalUnit):
4329   """Modifies an instances's parameters.
4330
4331   """
4332   HPATH = "instance-modify"
4333   HTYPE = constants.HTYPE_INSTANCE
4334   _OP_REQP = ["instance_name"]
4335
4336   def BuildHooksEnv(self):
4337     """Build hooks env.
4338
4339     This runs on the master, primary and secondaries.
4340
4341     """
4342     args = dict()
4343     if self.mem:
4344       args['memory'] = self.mem
4345     if self.vcpus:
4346       args['vcpus'] = self.vcpus
4347     if self.do_ip or self.do_bridge or self.mac:
4348       if self.do_ip:
4349         ip = self.ip
4350       else:
4351         ip = self.instance.nics[0].ip
4352       if self.bridge:
4353         bridge = self.bridge
4354       else:
4355         bridge = self.instance.nics[0].bridge
4356       if self.mac:
4357         mac = self.mac
4358       else:
4359         mac = self.instance.nics[0].mac
4360       args['nics'] = [(ip, bridge, mac)]
4361     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4362     nl = [self.sstore.GetMasterNode(),
4363           self.instance.primary_node] + list(self.instance.secondary_nodes)
4364     return env, nl, nl
4365
4366   def CheckPrereq(self):
4367     """Check prerequisites.
4368
4369     This only checks the instance list against the existing names.
4370
4371     """
4372     self.mem = getattr(self.op, "mem", None)
4373     self.vcpus = getattr(self.op, "vcpus", None)
4374     self.ip = getattr(self.op, "ip", None)
4375     self.mac = getattr(self.op, "mac", None)
4376     self.bridge = getattr(self.op, "bridge", None)
4377     self.kernel_path = getattr(self.op, "kernel_path", None)
4378     self.initrd_path = getattr(self.op, "initrd_path", None)
4379     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4380     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4381                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
4382     if all_parms.count(None) == len(all_parms):
4383       raise errors.OpPrereqError("No changes submitted")
4384     if self.mem is not None:
4385       try:
4386         self.mem = int(self.mem)
4387       except ValueError, err:
4388         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4389     if self.vcpus is not None:
4390       try:
4391         self.vcpus = int(self.vcpus)
4392       except ValueError, err:
4393         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4394     if self.ip is not None:
4395       self.do_ip = True
4396       if self.ip.lower() == "none":
4397         self.ip = None
4398       else:
4399         if not utils.IsValidIP(self.ip):
4400           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4401     else:
4402       self.do_ip = False
4403     self.do_bridge = (self.bridge is not None)
4404     if self.mac is not None:
4405       if self.cfg.IsMacInUse(self.mac):
4406         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4407                                    self.mac)
4408       if not utils.IsValidMac(self.mac):
4409         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4410
4411     if self.kernel_path is not None:
4412       self.do_kernel_path = True
4413       if self.kernel_path == constants.VALUE_NONE:
4414         raise errors.OpPrereqError("Can't set instance to no kernel")
4415
4416       if self.kernel_path != constants.VALUE_DEFAULT:
4417         if not os.path.isabs(self.kernel_path):
4418           raise errors.OpPrereqError("The kernel path must be an absolute"
4419                                     " filename")
4420     else:
4421       self.do_kernel_path = False
4422
4423     if self.initrd_path is not None:
4424       self.do_initrd_path = True
4425       if self.initrd_path not in (constants.VALUE_NONE,
4426                                   constants.VALUE_DEFAULT):
4427         if not os.path.isabs(self.initrd_path):
4428           raise errors.OpPrereqError("The initrd path must be an absolute"
4429                                     " filename")
4430     else:
4431       self.do_initrd_path = False
4432
4433     # boot order verification
4434     if self.hvm_boot_order is not None:
4435       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4436         if len(self.hvm_boot_order.strip("acdn")) != 0:
4437           raise errors.OpPrereqError("invalid boot order specified,"
4438                                      " must be one or more of [acdn]"
4439                                      " or 'default'")
4440
4441     instance = self.cfg.GetInstanceInfo(
4442       self.cfg.ExpandInstanceName(self.op.instance_name))
4443     if instance is None:
4444       raise errors.OpPrereqError("No such instance name '%s'" %
4445                                  self.op.instance_name)
4446     self.op.instance_name = instance.name
4447     self.instance = instance
4448     return
4449
4450   def Exec(self, feedback_fn):
4451     """Modifies an instance.
4452
4453     All parameters take effect only at the next restart of the instance.
4454     """
4455     result = []
4456     instance = self.instance
4457     if self.mem:
4458       instance.memory = self.mem
4459       result.append(("mem", self.mem))
4460     if self.vcpus:
4461       instance.vcpus = self.vcpus
4462       result.append(("vcpus",  self.vcpus))
4463     if self.do_ip:
4464       instance.nics[0].ip = self.ip
4465       result.append(("ip", self.ip))
4466     if self.bridge:
4467       instance.nics[0].bridge = self.bridge
4468       result.append(("bridge", self.bridge))
4469     if self.mac:
4470       instance.nics[0].mac = self.mac
4471       result.append(("mac", self.mac))
4472     if self.do_kernel_path:
4473       instance.kernel_path = self.kernel_path
4474       result.append(("kernel_path", self.kernel_path))
4475     if self.do_initrd_path:
4476       instance.initrd_path = self.initrd_path
4477       result.append(("initrd_path", self.initrd_path))
4478     if self.hvm_boot_order:
4479       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4480         instance.hvm_boot_order = None
4481       else:
4482         instance.hvm_boot_order = self.hvm_boot_order
4483       result.append(("hvm_boot_order", self.hvm_boot_order))
4484
4485     self.cfg.AddInstance(instance)
4486
4487     return result
4488
4489
4490 class LUQueryExports(NoHooksLU):
4491   """Query the exports list
4492
4493   """
4494   _OP_REQP = []
4495
4496   def CheckPrereq(self):
4497     """Check that the nodelist contains only existing nodes.
4498
4499     """
4500     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4501
4502   def Exec(self, feedback_fn):
4503     """Compute the list of all the exported system images.
4504
4505     Returns:
4506       a dictionary with the structure node->(export-list)
4507       where export-list is a list of the instances exported on
4508       that node.
4509
4510     """
4511     return rpc.call_export_list(self.nodes)
4512
4513
4514 class LUExportInstance(LogicalUnit):
4515   """Export an instance to an image in the cluster.
4516
4517   """
4518   HPATH = "instance-export"
4519   HTYPE = constants.HTYPE_INSTANCE
4520   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4521
4522   def BuildHooksEnv(self):
4523     """Build hooks env.
4524
4525     This will run on the master, primary node and target node.
4526
4527     """
4528     env = {
4529       "EXPORT_NODE": self.op.target_node,
4530       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4531       }
4532     env.update(_BuildInstanceHookEnvByObject(self.instance))
4533     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4534           self.op.target_node]
4535     return env, nl, nl
4536
4537   def CheckPrereq(self):
4538     """Check prerequisites.
4539
4540     This checks that the instance and node names are valid.
4541
4542     """
4543     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4544     self.instance = self.cfg.GetInstanceInfo(instance_name)
4545     if self.instance is None:
4546       raise errors.OpPrereqError("Instance '%s' not found" %
4547                                  self.op.instance_name)
4548
4549     # node verification
4550     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4551     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4552
4553     if self.dst_node is None:
4554       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4555                                  self.op.target_node)
4556     self.op.target_node = self.dst_node.name
4557
4558   def Exec(self, feedback_fn):
4559     """Export an instance to an image in the cluster.
4560
4561     """
4562     instance = self.instance
4563     dst_node = self.dst_node
4564     src_node = instance.primary_node
4565     if self.op.shutdown:
4566       # shutdown the instance, but not the disks
4567       if not rpc.call_instance_shutdown(src_node, instance):
4568         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4569                                  (instance.name, src_node))
4570
4571     vgname = self.cfg.GetVGName()
4572
4573     snap_disks = []
4574
4575     try:
4576       for disk in instance.disks:
4577         if disk.iv_name == "sda":
4578           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4579           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4580
4581           if not new_dev_name:
4582             logger.Error("could not snapshot block device %s on node %s" %
4583                          (disk.logical_id[1], src_node))
4584           else:
4585             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4586                                       logical_id=(vgname, new_dev_name),
4587                                       physical_id=(vgname, new_dev_name),
4588                                       iv_name=disk.iv_name)
4589             snap_disks.append(new_dev)
4590
4591     finally:
4592       if self.op.shutdown and instance.status == "up":
4593         if not rpc.call_instance_start(src_node, instance, None):
4594           _ShutdownInstanceDisks(instance, self.cfg)
4595           raise errors.OpExecError("Could not start instance")
4596
4597     # TODO: check for size
4598
4599     for dev in snap_disks:
4600       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4601                                            instance):
4602         logger.Error("could not export block device %s from node"
4603                      " %s to node %s" %
4604                      (dev.logical_id[1], src_node, dst_node.name))
4605       if not rpc.call_blockdev_remove(src_node, dev):
4606         logger.Error("could not remove snapshot block device %s from"
4607                      " node %s" % (dev.logical_id[1], src_node))
4608
4609     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4610       logger.Error("could not finalize export for instance %s on node %s" %
4611                    (instance.name, dst_node.name))
4612
4613     nodelist = self.cfg.GetNodeList()
4614     nodelist.remove(dst_node.name)
4615
4616     # on one-node clusters nodelist will be empty after the removal
4617     # if we proceed the backup would be removed because OpQueryExports
4618     # substitutes an empty list with the full cluster node list.
4619     if nodelist:
4620       op = opcodes.OpQueryExports(nodes=nodelist)
4621       exportlist = self.proc.ChainOpCode(op)
4622       for node in exportlist:
4623         if instance.name in exportlist[node]:
4624           if not rpc.call_export_remove(node, instance.name):
4625             logger.Error("could not remove older export for instance %s"
4626                          " on node %s" % (instance.name, node))
4627
4628
4629 class LURemoveExport(NoHooksLU):
4630   """Remove exports related to the named instance.
4631
4632   """
4633   _OP_REQP = ["instance_name"]
4634
4635   def CheckPrereq(self):
4636     """Check prerequisites.
4637     """
4638     pass
4639
4640   def Exec(self, feedback_fn):
4641     """Remove any export.
4642
4643     """
4644     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4645     # If the instance was not found we'll try with the name that was passed in.
4646     # This will only work if it was an FQDN, though.
4647     fqdn_warn = False
4648     if not instance_name:
4649       fqdn_warn = True
4650       instance_name = self.op.instance_name
4651
4652     op = opcodes.OpQueryExports(nodes=[])
4653     exportlist = self.proc.ChainOpCode(op)
4654     found = False
4655     for node in exportlist:
4656       if instance_name in exportlist[node]:
4657         found = True
4658         if not rpc.call_export_remove(node, instance_name):
4659           logger.Error("could not remove export for instance %s"
4660                        " on node %s" % (instance_name, node))
4661
4662     if fqdn_warn and not found:
4663       feedback_fn("Export not found. If trying to remove an export belonging"
4664                   " to a deleted instance please use its Fully Qualified"
4665                   " Domain Name.")
4666
4667
4668 class TagsLU(NoHooksLU):
4669   """Generic tags LU.
4670
4671   This is an abstract class which is the parent of all the other tags LUs.
4672
4673   """
4674   def CheckPrereq(self):
4675     """Check prerequisites.
4676
4677     """
4678     if self.op.kind == constants.TAG_CLUSTER:
4679       self.target = self.cfg.GetClusterInfo()
4680     elif self.op.kind == constants.TAG_NODE:
4681       name = self.cfg.ExpandNodeName(self.op.name)
4682       if name is None:
4683         raise errors.OpPrereqError("Invalid node name (%s)" %
4684                                    (self.op.name,))
4685       self.op.name = name
4686       self.target = self.cfg.GetNodeInfo(name)
4687     elif self.op.kind == constants.TAG_INSTANCE:
4688       name = self.cfg.ExpandInstanceName(self.op.name)
4689       if name is None:
4690         raise errors.OpPrereqError("Invalid instance name (%s)" %
4691                                    (self.op.name,))
4692       self.op.name = name
4693       self.target = self.cfg.GetInstanceInfo(name)
4694     else:
4695       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4696                                  str(self.op.kind))
4697
4698
4699 class LUGetTags(TagsLU):
4700   """Returns the tags of a given object.
4701
4702   """
4703   _OP_REQP = ["kind", "name"]
4704
4705   def Exec(self, feedback_fn):
4706     """Returns the tag list.
4707
4708     """
4709     return self.target.GetTags()
4710
4711
4712 class LUSearchTags(NoHooksLU):
4713   """Searches the tags for a given pattern.
4714
4715   """
4716   _OP_REQP = ["pattern"]
4717
4718   def CheckPrereq(self):
4719     """Check prerequisites.
4720
4721     This checks the pattern passed for validity by compiling it.
4722
4723     """
4724     try:
4725       self.re = re.compile(self.op.pattern)
4726     except re.error, err:
4727       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4728                                  (self.op.pattern, err))
4729
4730   def Exec(self, feedback_fn):
4731     """Returns the tag list.
4732
4733     """
4734     cfg = self.cfg
4735     tgts = [("/cluster", cfg.GetClusterInfo())]
4736     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4737     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4738     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4739     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4740     results = []
4741     for path, target in tgts:
4742       for tag in target.GetTags():
4743         if self.re.search(tag):
4744           results.append((path, tag))
4745     return results
4746
4747
4748 class LUAddTags(TagsLU):
4749   """Sets a tag on a given object.
4750
4751   """
4752   _OP_REQP = ["kind", "name", "tags"]
4753
4754   def CheckPrereq(self):
4755     """Check prerequisites.
4756
4757     This checks the type and length of the tag name and value.
4758
4759     """
4760     TagsLU.CheckPrereq(self)
4761     for tag in self.op.tags:
4762       objects.TaggableObject.ValidateTag(tag)
4763
4764   def Exec(self, feedback_fn):
4765     """Sets the tag.
4766
4767     """
4768     try:
4769       for tag in self.op.tags:
4770         self.target.AddTag(tag)
4771     except errors.TagError, err:
4772       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4773     try:
4774       self.cfg.Update(self.target)
4775     except errors.ConfigurationError:
4776       raise errors.OpRetryError("There has been a modification to the"
4777                                 " config file and the operation has been"
4778                                 " aborted. Please retry.")
4779
4780
4781 class LUDelTags(TagsLU):
4782   """Delete a list of tags from a given object.
4783
4784   """
4785   _OP_REQP = ["kind", "name", "tags"]
4786
4787   def CheckPrereq(self):
4788     """Check prerequisites.
4789
4790     This checks that we have the given tag.
4791
4792     """
4793     TagsLU.CheckPrereq(self)
4794     for tag in self.op.tags:
4795       objects.TaggableObject.ValidateTag(tag)
4796     del_tags = frozenset(self.op.tags)
4797     cur_tags = self.target.GetTags()
4798     if not del_tags <= cur_tags:
4799       diff_tags = del_tags - cur_tags
4800       diff_names = ["'%s'" % tag for tag in diff_tags]
4801       diff_names.sort()
4802       raise errors.OpPrereqError("Tag(s) %s not found" %
4803                                  (",".join(diff_names)))
4804
4805   def Exec(self, feedback_fn):
4806     """Remove the tag from the object.
4807
4808     """
4809     for tag in self.op.tags:
4810       self.target.RemoveTag(tag)
4811     try:
4812       self.cfg.Update(self.target)
4813     except errors.ConfigurationError:
4814       raise errors.OpRetryError("There has been a modification to the"
4815                                 " config file and the operation has been"
4816                                 " aborted. Please retry.")
4817
4818 class LUTestDelay(NoHooksLU):
4819   """Sleep for a specified amount of time.
4820
4821   This LU sleeps on the master and/or nodes for a specified amoutn of
4822   time.
4823
4824   """
4825   _OP_REQP = ["duration", "on_master", "on_nodes"]
4826
4827   def CheckPrereq(self):
4828     """Check prerequisites.
4829
4830     This checks that we have a good list of nodes and/or the duration
4831     is valid.
4832
4833     """
4834
4835     if self.op.on_nodes:
4836       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4837
4838   def Exec(self, feedback_fn):
4839     """Do the actual sleep.
4840
4841     """
4842     if self.op.on_master:
4843       if not utils.TestDelay(self.op.duration):
4844         raise errors.OpExecError("Error during master delay test")
4845     if self.op.on_nodes:
4846       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4847       if not result:
4848         raise errors.OpExecError("Complete failure from rpc call")
4849       for node, node_result in result.items():
4850         if not node_result:
4851           raise errors.OpExecError("Failure during rpc call to node %s,"
4852                                    " result: %s" % (node, node_result))
4853
4854
4855 class IAllocator(object):
4856   """IAllocator framework.
4857
4858   An IAllocator instance has three sets of attributes:
4859     - cfg/sstore that are needed to query the cluster
4860     - input data (all members of the _KEYS class attribute are required)
4861     - four buffer attributes (in|out_data|text), that represent the
4862       input (to the external script) in text and data structure format,
4863       and the output from it, again in two formats
4864     - the result variables from the script (success, info, nodes) for
4865       easy usage
4866
4867   """
4868   _ALLO_KEYS = [
4869     "mem_size", "disks", "disk_template",
4870     "os", "tags", "nics", "vcpus",
4871     ]
4872   _RELO_KEYS = [
4873     "relocate_from",
4874     ]
4875
4876   def __init__(self, cfg, sstore, mode, name, **kwargs):
4877     self.cfg = cfg
4878     self.sstore = sstore
4879     # init buffer variables
4880     self.in_text = self.out_text = self.in_data = self.out_data = None
4881     # init all input fields so that pylint is happy
4882     self.mode = mode
4883     self.name = name
4884     self.mem_size = self.disks = self.disk_template = None
4885     self.os = self.tags = self.nics = self.vcpus = None
4886     self.relocate_from = None
4887     # computed fields
4888     self.required_nodes = None
4889     # init result fields
4890     self.success = self.info = self.nodes = None
4891     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4892       keyset = self._ALLO_KEYS
4893     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4894       keyset = self._RELO_KEYS
4895     else:
4896       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4897                                    " IAllocator" % self.mode)
4898     for key in kwargs:
4899       if key not in keyset:
4900         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4901                                      " IAllocator" % key)
4902       setattr(self, key, kwargs[key])
4903     for key in keyset:
4904       if key not in kwargs:
4905         raise errors.ProgrammerError("Missing input parameter '%s' to"
4906                                      " IAllocator" % key)
4907     self._BuildInputData()
4908
4909   def _ComputeClusterData(self):
4910     """Compute the generic allocator input data.
4911
4912     This is the data that is independent of the actual operation.
4913
4914     """
4915     cfg = self.cfg
4916     # cluster data
4917     data = {
4918       "version": 1,
4919       "cluster_name": self.sstore.GetClusterName(),
4920       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4921       # we don't have job IDs
4922       }
4923
4924     # node data
4925     node_results = {}
4926     node_list = cfg.GetNodeList()
4927     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4928     for nname in node_list:
4929       ninfo = cfg.GetNodeInfo(nname)
4930       if nname not in node_data or not isinstance(node_data[nname], dict):
4931         raise errors.OpExecError("Can't get data for node %s" % nname)
4932       remote_info = node_data[nname]
4933       for attr in ['memory_total', 'memory_free',
4934                    'vg_size', 'vg_free']:
4935         if attr not in remote_info:
4936           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4937                                    (nname, attr))
4938         try:
4939           int(remote_info[attr])
4940         except ValueError, err:
4941           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4942                                    " %s" % (nname, attr, str(err)))
4943       pnr = {
4944         "tags": list(ninfo.GetTags()),
4945         "total_memory": utils.TryConvert(int, remote_info['memory_total']),
4946         "free_memory": utils.TryConvert(int, remote_info['memory_free']),
4947         "total_disk": utils.TryConvert(int, remote_info['vg_size']),
4948         "free_disk": utils.TryConvert(int, remote_info['vg_free']),
4949         "primary_ip": ninfo.primary_ip,
4950         "secondary_ip": ninfo.secondary_ip,
4951         }
4952       node_results[nname] = pnr
4953     data["nodes"] = node_results
4954
4955     # instance data
4956     instance_data = {}
4957     i_list = cfg.GetInstanceList()
4958     for iname in i_list:
4959       iinfo = cfg.GetInstanceInfo(iname)
4960       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4961                   for n in iinfo.nics]
4962       pir = {
4963         "tags": list(iinfo.GetTags()),
4964         "should_run": iinfo.status == "up",
4965         "vcpus": iinfo.vcpus,
4966         "memory": iinfo.memory,
4967         "os": iinfo.os,
4968         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4969         "nics": nic_data,
4970         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4971         "disk_template": iinfo.disk_template,
4972         }
4973       instance_data[iname] = pir
4974
4975     data["instances"] = instance_data
4976
4977     self.in_data = data
4978
4979   def _AddNewInstance(self):
4980     """Add new instance data to allocator structure.
4981
4982     This in combination with _AllocatorGetClusterData will create the
4983     correct structure needed as input for the allocator.
4984
4985     The checks for the completeness of the opcode must have already been
4986     done.
4987
4988     """
4989     data = self.in_data
4990     if len(self.disks) != 2:
4991       raise errors.OpExecError("Only two-disk configurations supported")
4992
4993     disk_space = _ComputeDiskSize(self.disk_template,
4994                                   self.disks[0]["size"], self.disks[1]["size"])
4995
4996     if self.disk_template in constants.DTS_NET_MIRROR:
4997       self.required_nodes = 2
4998     else:
4999       self.required_nodes = 1
5000     request = {
5001       "type": "allocate",
5002       "name": self.name,
5003       "disk_template": self.disk_template,
5004       "tags": self.tags,
5005       "os": self.os,
5006       "vcpus": self.vcpus,
5007       "memory": self.mem_size,
5008       "disks": self.disks,
5009       "disk_space_total": disk_space,
5010       "nics": self.nics,
5011       "required_nodes": self.required_nodes,
5012       }
5013     data["request"] = request
5014
5015   def _AddRelocateInstance(self):
5016     """Add relocate instance data to allocator structure.
5017
5018     This in combination with _IAllocatorGetClusterData will create the
5019     correct structure needed as input for the allocator.
5020
5021     The checks for the completeness of the opcode must have already been
5022     done.
5023
5024     """
5025     instance = self.cfg.GetInstanceInfo(self.name)
5026     if instance is None:
5027       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5028                                    " IAllocator" % self.name)
5029
5030     if instance.disk_template not in constants.DTS_NET_MIRROR:
5031       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5032
5033     if len(instance.secondary_nodes) != 1:
5034       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5035
5036     self.required_nodes = 1
5037
5038     disk_space = _ComputeDiskSize(instance.disk_template,
5039                                   instance.disks[0].size,
5040                                   instance.disks[1].size)
5041
5042     request = {
5043       "type": "relocate",
5044       "name": self.name,
5045       "disk_space_total": disk_space,
5046       "required_nodes": self.required_nodes,
5047       "relocate_from": self.relocate_from,
5048       }
5049     self.in_data["request"] = request
5050
5051   def _BuildInputData(self):
5052     """Build input data structures.
5053
5054     """
5055     self._ComputeClusterData()
5056
5057     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5058       self._AddNewInstance()
5059     else:
5060       self._AddRelocateInstance()
5061
5062     self.in_text = serializer.Dump(self.in_data)
5063
5064   def Run(self, name, validate=True):
5065     """Run an instance allocator and return the results.
5066
5067     """
5068     data = self.in_text
5069
5070     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
5071                                   os.path.isfile)
5072     if alloc_script is None:
5073       raise errors.OpExecError("Can't find allocator '%s'" % name)
5074
5075     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
5076     try:
5077       os.write(fd, data)
5078       os.close(fd)
5079       result = utils.RunCmd([alloc_script, fin_name])
5080       if result.failed:
5081         raise errors.OpExecError("Instance allocator call failed: %s,"
5082                                  " output: %s" %
5083                                  (result.fail_reason, result.output))
5084     finally:
5085       os.unlink(fin_name)
5086     self.out_text = result.stdout
5087     if validate:
5088       self._ValidateResult()
5089
5090   def _ValidateResult(self):
5091     """Process the allocator results.
5092
5093     This will process and if successful save the result in
5094     self.out_data and the other parameters.
5095
5096     """
5097     try:
5098       rdict = serializer.Load(self.out_text)
5099     except Exception, err:
5100       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5101
5102     if not isinstance(rdict, dict):
5103       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5104
5105     for key in "success", "info", "nodes":
5106       if key not in rdict:
5107         raise errors.OpExecError("Can't parse iallocator results:"
5108                                  " missing key '%s'" % key)
5109       setattr(self, key, rdict[key])
5110
5111     if not isinstance(rdict["nodes"], list):
5112       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5113                                " is not a list")
5114     self.out_data = rdict
5115
5116
5117 class LUTestAllocator(NoHooksLU):
5118   """Run allocator tests.
5119
5120   This LU runs the allocator tests
5121
5122   """
5123   _OP_REQP = ["direction", "mode", "name"]
5124
5125   def CheckPrereq(self):
5126     """Check prerequisites.
5127
5128     This checks the opcode parameters depending on the director and mode test.
5129
5130     """
5131     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5132       for attr in ["name", "mem_size", "disks", "disk_template",
5133                    "os", "tags", "nics", "vcpus"]:
5134         if not hasattr(self.op, attr):
5135           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5136                                      attr)
5137       iname = self.cfg.ExpandInstanceName(self.op.name)
5138       if iname is not None:
5139         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5140                                    iname)
5141       if not isinstance(self.op.nics, list):
5142         raise errors.OpPrereqError("Invalid parameter 'nics'")
5143       for row in self.op.nics:
5144         if (not isinstance(row, dict) or
5145             "mac" not in row or
5146             "ip" not in row or
5147             "bridge" not in row):
5148           raise errors.OpPrereqError("Invalid contents of the"
5149                                      " 'nics' parameter")
5150       if not isinstance(self.op.disks, list):
5151         raise errors.OpPrereqError("Invalid parameter 'disks'")
5152       if len(self.op.disks) != 2:
5153         raise errors.OpPrereqError("Only two-disk configurations supported")
5154       for row in self.op.disks:
5155         if (not isinstance(row, dict) or
5156             "size" not in row or
5157             not isinstance(row["size"], int) or
5158             "mode" not in row or
5159             row["mode"] not in ['r', 'w']):
5160           raise errors.OpPrereqError("Invalid contents of the"
5161                                      " 'disks' parameter")
5162     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5163       if not hasattr(self.op, "name"):
5164         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5165       fname = self.cfg.ExpandInstanceName(self.op.name)
5166       if fname is None:
5167         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5168                                    self.op.name)
5169       self.op.name = fname
5170       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5171     else:
5172       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5173                                  self.op.mode)
5174
5175     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5176       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5177         raise errors.OpPrereqError("Missing allocator name")
5178     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5179       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5180                                  self.op.direction)
5181
5182   def Exec(self, feedback_fn):
5183     """Run the allocator test.
5184
5185     """
5186     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5187       ial = IAllocator(self.cfg, self.sstore,
5188                        mode=self.op.mode,
5189                        name=self.op.name,
5190                        mem_size=self.op.mem_size,
5191                        disks=self.op.disks,
5192                        disk_template=self.op.disk_template,
5193                        os=self.op.os,
5194                        tags=self.op.tags,
5195                        nics=self.op.nics,
5196                        vcpus=self.op.vcpus,
5197                        )
5198     else:
5199       ial = IAllocator(self.cfg, self.sstore,
5200                        mode=self.op.mode,
5201                        name=self.op.name,
5202                        relocate_from=list(self.relocate_from),
5203                        )
5204
5205     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5206       result = ial.in_text
5207     else:
5208       ial.Run(self.op.allocator, validate=False)
5209       result = ial.out_text
5210     return result