Add instance port support.
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275   else:
276     nic_count = 0
277
278   env["INSTANCE_NIC_COUNT"] = nic_count
279
280   return env
281
282
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284   """Builds instance related env variables for hooks from an object.
285
286   Args:
287     instance: objects.Instance object of instance
288     override: dict of values to override
289   """
290   args = {
291     'name': instance.name,
292     'primary_node': instance.primary_node,
293     'secondary_nodes': instance.secondary_nodes,
294     'os_type': instance.os,
295     'status': instance.os,
296     'memory': instance.memory,
297     'vcpus': instance.vcpus,
298     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299   }
300   if override:
301     args.update(override)
302   return _BuildInstanceHookEnv(**args)
303
304
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306   """Ensure a node has a correct known_hosts entry.
307
308   Args:
309     fullnode - Fully qualified domain name of host. (str)
310     ip       - IPv4 address of host (str)
311     pubkey   - the public key of the cluster
312
313   """
314   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316   else:
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318
319   inthere = False
320
321   save_lines = []
322   add_lines = []
323   removed = False
324
325   for rawline in f:
326     logger.Debug('read %s' % (repr(rawline),))
327
328     parts = rawline.rstrip('\r\n').split()
329
330     # Ignore unwanted lines
331     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332       fields = parts[0].split(',')
333       key = parts[2]
334
335       haveall = True
336       havesome = False
337       for spec in [ ip, fullnode ]:
338         if spec not in fields:
339           haveall = False
340         if spec in fields:
341           havesome = True
342
343       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344       if haveall and key == pubkey:
345         inthere = True
346         save_lines.append(rawline)
347         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348         continue
349
350       if havesome and (not haveall or key != pubkey):
351         removed = True
352         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353         continue
354
355     save_lines.append(rawline)
356
357   if not inthere:
358     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360
361   if removed:
362     save_lines = save_lines + add_lines
363
364     # Write a new file and replace old.
365     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366                                    constants.DATA_DIR)
367     newfile = os.fdopen(fd, 'w')
368     try:
369       newfile.write(''.join(save_lines))
370     finally:
371       newfile.close()
372     logger.Debug("Wrote new known_hosts.")
373     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374
375   elif add_lines:
376     # Simply appending a new line will do the trick.
377     f.seek(0, 2)
378     for add in add_lines:
379       f.write(add)
380
381   f.close()
382
383
384 def _HasValidVG(vglist, vgname):
385   """Checks if the volume group list is valid.
386
387   A non-None return value means there's an error, and the return value
388   is the error message.
389
390   """
391   vgsize = vglist.get(vgname, None)
392   if vgsize is None:
393     return "volume group '%s' missing" % vgname
394   elif vgsize < 20480:
395     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396             (vgname, vgsize))
397   return None
398
399
400 def _InitSSHSetup(node):
401   """Setup the SSH configuration for the cluster.
402
403
404   This generates a dsa keypair for root, adds the pub key to the
405   permitted hosts and adds the hostkey to its own known hosts.
406
407   Args:
408     node: the name of this host as a fqdn
409
410   """
411   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412
413   for name in priv_key, pub_key:
414     if os.path.exists(name):
415       utils.CreateBackup(name)
416     utils.RemoveFile(name)
417
418   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419                          "-f", priv_key,
420                          "-q", "-N", ""])
421   if result.failed:
422     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423                              result.output)
424
425   f = open(pub_key, 'r')
426   try:
427     utils.AddAuthorizedKey(auth_keys, f.read(8192))
428   finally:
429     f.close()
430
431
432 def _InitGanetiServerSetup(ss):
433   """Setup the necessary configuration for the initial node daemon.
434
435   This creates the nodepass file containing the shared password for
436   the cluster and also generates the SSL certificate.
437
438   """
439   # Create pseudo random password
440   randpass = sha.new(os.urandom(64)).hexdigest()
441   # and write it into sstore
442   ss.SetKey(ss.SS_NODED_PASS, randpass)
443
444   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445                          "-days", str(365*5), "-nodes", "-x509",
446                          "-keyout", constants.SSL_CERT_FILE,
447                          "-out", constants.SSL_CERT_FILE, "-batch"])
448   if result.failed:
449     raise errors.OpExecError("could not generate server ssl cert, command"
450                              " %s had exitcode %s and error message %s" %
451                              (result.cmd, result.exit_code, result.output))
452
453   os.chmod(constants.SSL_CERT_FILE, 0400)
454
455   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456
457   if result.failed:
458     raise errors.OpExecError("Could not start the node daemon, command %s"
459                              " had exitcode %s and error %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462
463 def _CheckInstanceBridgesExist(instance):
464   """Check that the brigdes needed by an instance exist.
465
466   """
467   # check bridges existance
468   brlist = [nic.bridge for nic in instance.nics]
469   if not rpc.call_bridges_exist(instance.primary_node, brlist):
470     raise errors.OpPrereqError("one or more target bridges %s does not"
471                                " exist on destination node '%s'" %
472                                (brlist, instance.primary_node))
473
474
475 class LUInitCluster(LogicalUnit):
476   """Initialise the cluster.
477
478   """
479   HPATH = "cluster-init"
480   HTYPE = constants.HTYPE_CLUSTER
481   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482               "def_bridge", "master_netdev"]
483   REQ_CLUSTER = False
484
485   def BuildHooksEnv(self):
486     """Build hooks env.
487
488     Notes: Since we don't require a cluster, we must manually add
489     ourselves in the post-run node list.
490
491     """
492     env = {"OP_TARGET": self.op.cluster_name}
493     return env, [], [self.hostname.name]
494
495   def CheckPrereq(self):
496     """Verify that the passed name is a valid one.
497
498     """
499     if config.ConfigWriter.IsCluster():
500       raise errors.OpPrereqError("Cluster is already initialised")
501
502     self.hostname = hostname = utils.HostInfo()
503
504     if hostname.ip.startswith("127."):
505       raise errors.OpPrereqError("This host's IP resolves to the private"
506                                  " range (%s). Please fix DNS or /etc/hosts." %
507                                  (hostname.ip,))
508
509     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510
511     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512                          constants.DEFAULT_NODED_PORT):
513       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514                                  " to %s,\nbut this ip address does not"
515                                  " belong to this host."
516                                  " Aborting." % hostname.ip)
517
518     secondary_ip = getattr(self.op, "secondary_ip", None)
519     if secondary_ip and not utils.IsValidIP(secondary_ip):
520       raise errors.OpPrereqError("Invalid secondary ip given")
521     if (secondary_ip and
522         secondary_ip != hostname.ip and
523         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524                            constants.DEFAULT_NODED_PORT))):
525       raise errors.OpPrereqError("You gave %s as secondary IP,"
526                                  " but it does not belong to this host." %
527                                  secondary_ip)
528     self.secondary_ip = secondary_ip
529
530     # checks presence of the volume group given
531     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532
533     if vgstatus:
534       raise errors.OpPrereqError("Error: %s" % vgstatus)
535
536     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537                     self.op.mac_prefix):
538       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539                                  self.op.mac_prefix)
540
541     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543                                  self.op.hypervisor_type)
544
545     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546     if result.failed:
547       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548                                  (self.op.master_netdev,
549                                   result.output.strip()))
550
551     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553       raise errors.OpPrereqError("Init.d script '%s' missing or not"
554                                  " executable." % constants.NODE_INITD_SCRIPT)
555
556   def Exec(self, feedback_fn):
557     """Initialize the cluster.
558
559     """
560     clustername = self.clustername
561     hostname = self.hostname
562
563     # set up the simple store
564     self.sstore = ss = ssconf.SimpleStore()
565     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570
571     # set up the inter-node password and certificate
572     _InitGanetiServerSetup(ss)
573
574     # start the master ip
575     rpc.call_node_start_master(hostname.name)
576
577     # set up ssh config and /etc/hosts
578     f = open(constants.SSH_HOST_RSA_PUB, 'r')
579     try:
580       sshline = f.read()
581     finally:
582       f.close()
583     sshkey = sshline.split(" ")[1]
584
585     _AddHostToEtcHosts(hostname.name)
586
587     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588
589     _InitSSHSetup(hostname.name)
590
591     # init of cluster config file
592     self.cfg = cfgw = config.ConfigWriter()
593     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594                     sshkey, self.op.mac_prefix,
595                     self.op.vg_name, self.op.def_bridge)
596
597
598 class LUDestroyCluster(NoHooksLU):
599   """Logical unit for destroying the cluster.
600
601   """
602   _OP_REQP = []
603
604   def CheckPrereq(self):
605     """Check prerequisites.
606
607     This checks whether the cluster is empty.
608
609     Any errors are signalled by raising errors.OpPrereqError.
610
611     """
612     master = self.sstore.GetMasterNode()
613
614     nodelist = self.cfg.GetNodeList()
615     if len(nodelist) != 1 or nodelist[0] != master:
616       raise errors.OpPrereqError("There are still %d node(s) in"
617                                  " this cluster." % (len(nodelist) - 1))
618     instancelist = self.cfg.GetInstanceList()
619     if instancelist:
620       raise errors.OpPrereqError("There are still %d instance(s) in"
621                                  " this cluster." % len(instancelist))
622
623   def Exec(self, feedback_fn):
624     """Destroys the cluster.
625
626     """
627     master = self.sstore.GetMasterNode()
628     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629     utils.CreateBackup(priv_key)
630     utils.CreateBackup(pub_key)
631     rpc.call_node_leave_cluster(master)
632
633
634 class LUVerifyCluster(NoHooksLU):
635   """Verifies the cluster status.
636
637   """
638   _OP_REQP = []
639
640   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641                   remote_version, feedback_fn):
642     """Run multiple tests against a node.
643
644     Test list:
645       - compares ganeti version
646       - checks vg existance and size > 20G
647       - checks config file checksum
648       - checks ssh to other nodes
649
650     Args:
651       node: name of the node to check
652       file_list: required list of files
653       local_cksum: dictionary of local files and their checksums
654
655     """
656     # compares ganeti version
657     local_version = constants.PROTOCOL_VERSION
658     if not remote_version:
659       feedback_fn(" - ERROR: connection to %s failed" % (node))
660       return True
661
662     if local_version != remote_version:
663       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664                       (local_version, node, remote_version))
665       return True
666
667     # checks vg existance and size > 20G
668
669     bad = False
670     if not vglist:
671       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672                       (node,))
673       bad = True
674     else:
675       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676       if vgstatus:
677         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678         bad = True
679
680     # checks config file checksum
681     # checks ssh to any
682
683     if 'filelist' not in node_result:
684       bad = True
685       feedback_fn("  - ERROR: node hasn't returned file checksum data")
686     else:
687       remote_cksum = node_result['filelist']
688       for file_name in file_list:
689         if file_name not in remote_cksum:
690           bad = True
691           feedback_fn("  - ERROR: file '%s' missing" % file_name)
692         elif remote_cksum[file_name] != local_cksum[file_name]:
693           bad = True
694           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695
696     if 'nodelist' not in node_result:
697       bad = True
698       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699     else:
700       if node_result['nodelist']:
701         bad = True
702         for node in node_result['nodelist']:
703           feedback_fn("  - ERROR: communication with node '%s': %s" %
704                           (node, node_result['nodelist'][node]))
705     hyp_result = node_result.get('hypervisor', None)
706     if hyp_result is not None:
707       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708     return bad
709
710   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711     """Verify an instance.
712
713     This function checks to see if the required block devices are
714     available on the instance's node.
715
716     """
717     bad = False
718
719     instancelist = self.cfg.GetInstanceList()
720     if not instance in instancelist:
721       feedback_fn("  - ERROR: instance %s not in instance list %s" %
722                       (instance, instancelist))
723       bad = True
724
725     instanceconfig = self.cfg.GetInstanceInfo(instance)
726     node_current = instanceconfig.primary_node
727
728     node_vol_should = {}
729     instanceconfig.MapLVsByNode(node_vol_should)
730
731     for node in node_vol_should:
732       for volume in node_vol_should[node]:
733         if node not in node_vol_is or volume not in node_vol_is[node]:
734           feedback_fn("  - ERROR: volume %s missing on node %s" %
735                           (volume, node))
736           bad = True
737
738     if not instanceconfig.status == 'down':
739       if not instance in node_instance[node_current]:
740         feedback_fn("  - ERROR: instance %s not running on node %s" %
741                         (instance, node_current))
742         bad = True
743
744     for node in node_instance:
745       if (not node == node_current):
746         if instance in node_instance[node]:
747           feedback_fn("  - ERROR: instance %s should not run on node %s" %
748                           (instance, node))
749           bad = True
750
751     return bad
752
753   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754     """Verify if there are any unknown volumes in the cluster.
755
756     The .os, .swap and backup volumes are ignored. All other volumes are
757     reported as unknown.
758
759     """
760     bad = False
761
762     for node in node_vol_is:
763       for volume in node_vol_is[node]:
764         if node not in node_vol_should or volume not in node_vol_should[node]:
765           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766                       (volume, node))
767           bad = True
768     return bad
769
770   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771     """Verify the list of running instances.
772
773     This checks what instances are running but unknown to the cluster.
774
775     """
776     bad = False
777     for node in node_instance:
778       for runninginstance in node_instance[node]:
779         if runninginstance not in instancelist:
780           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781                           (runninginstance, node))
782           bad = True
783     return bad
784
785   def CheckPrereq(self):
786     """Check prerequisites.
787
788     This has no prerequisites.
789
790     """
791     pass
792
793   def Exec(self, feedback_fn):
794     """Verify integrity of cluster, performing various test on nodes.
795
796     """
797     bad = False
798     feedback_fn("* Verifying global settings")
799     for msg in self.cfg.VerifyConfig():
800       feedback_fn("  - ERROR: %s" % msg)
801
802     vg_name = self.cfg.GetVGName()
803     nodelist = utils.NiceSort(self.cfg.GetNodeList())
804     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
805     node_volume = {}
806     node_instance = {}
807
808     # FIXME: verify OS list
809     # do local checksums
810     file_names = list(self.sstore.GetFileList())
811     file_names.append(constants.SSL_CERT_FILE)
812     file_names.append(constants.CLUSTER_CONF_FILE)
813     local_checksums = utils.FingerprintFiles(file_names)
814
815     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817     all_instanceinfo = rpc.call_instance_list(nodelist)
818     all_vglist = rpc.call_vg_list(nodelist)
819     node_verify_param = {
820       'filelist': file_names,
821       'nodelist': nodelist,
822       'hypervisor': None,
823       }
824     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825     all_rversion = rpc.call_version(nodelist)
826
827     for node in nodelist:
828       feedback_fn("* Verifying node %s" % node)
829       result = self._VerifyNode(node, file_names, local_checksums,
830                                 all_vglist[node], all_nvinfo[node],
831                                 all_rversion[node], feedback_fn)
832       bad = bad or result
833
834       # node_volume
835       volumeinfo = all_volumeinfo[node]
836
837       if type(volumeinfo) != dict:
838         feedback_fn("  - ERROR: connection to %s failed" % (node,))
839         bad = True
840         continue
841
842       node_volume[node] = volumeinfo
843
844       # node_instance
845       nodeinstance = all_instanceinfo[node]
846       if type(nodeinstance) != list:
847         feedback_fn("  - ERROR: connection to %s failed" % (node,))
848         bad = True
849         continue
850
851       node_instance[node] = nodeinstance
852
853     node_vol_should = {}
854
855     for instance in instancelist:
856       feedback_fn("* Verifying instance %s" % instance)
857       result =  self._VerifyInstance(instance, node_volume, node_instance,
858                                      feedback_fn)
859       bad = bad or result
860
861       inst_config = self.cfg.GetInstanceInfo(instance)
862
863       inst_config.MapLVsByNode(node_vol_should)
864
865     feedback_fn("* Verifying orphan volumes")
866     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867                                        feedback_fn)
868     bad = bad or result
869
870     feedback_fn("* Verifying remaining instances")
871     result = self._VerifyOrphanInstances(instancelist, node_instance,
872                                          feedback_fn)
873     bad = bad or result
874
875     return int(bad)
876
877
878 class LUVerifyDisks(NoHooksLU):
879   """Verifies the cluster disks status.
880
881   """
882   _OP_REQP = []
883
884   def CheckPrereq(self):
885     """Check prerequisites.
886
887     This has no prerequisites.
888
889     """
890     pass
891
892   def Exec(self, feedback_fn):
893     """Verify integrity of cluster disks.
894
895     """
896     result = res_nodes, res_instances = [], []
897
898     vg_name = self.cfg.GetVGName()
899     nodes = utils.NiceSort(self.cfg.GetNodeList())
900     instances = [self.cfg.GetInstanceInfo(name)
901                  for name in self.cfg.GetInstanceList()]
902
903     nv_dict = {}
904     for inst in instances:
905       inst_lvs = {}
906       if (inst.status != "up" or
907           inst.disk_template not in constants.DTS_NET_MIRROR):
908         continue
909       inst.MapLVsByNode(inst_lvs)
910       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
911       for node, vol_list in inst_lvs.iteritems():
912         for vol in vol_list:
913           nv_dict[(node, vol)] = inst
914
915     if not nv_dict:
916       return result
917
918     node_lvs = rpc.call_volume_list(nodes, vg_name)
919
920     to_act = set()
921     for node in nodes:
922       # node_volume
923       lvs = node_lvs[node]
924
925       if not isinstance(lvs, dict):
926         logger.Info("connection to node %s failed or invalid data returned" %
927                     (node,))
928         res_nodes.append(node)
929         continue
930
931       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
932         if not lv_online:
933           inst = nv_dict.get((node, lv_name), None)
934           if inst is not None and inst.name not in res_instances:
935             res_instances.append(inst.name)
936
937     return result
938
939
940 class LURenameCluster(LogicalUnit):
941   """Rename the cluster.
942
943   """
944   HPATH = "cluster-rename"
945   HTYPE = constants.HTYPE_CLUSTER
946   _OP_REQP = ["name"]
947
948   def BuildHooksEnv(self):
949     """Build hooks env.
950
951     """
952     env = {
953       "OP_TARGET": self.op.sstore.GetClusterName(),
954       "NEW_NAME": self.op.name,
955       }
956     mn = self.sstore.GetMasterNode()
957     return env, [mn], [mn]
958
959   def CheckPrereq(self):
960     """Verify that the passed name is a valid one.
961
962     """
963     hostname = utils.HostInfo(self.op.name)
964
965     new_name = hostname.name
966     self.ip = new_ip = hostname.ip
967     old_name = self.sstore.GetClusterName()
968     old_ip = self.sstore.GetMasterIP()
969     if new_name == old_name and new_ip == old_ip:
970       raise errors.OpPrereqError("Neither the name nor the IP address of the"
971                                  " cluster has changed")
972     if new_ip != old_ip:
973       result = utils.RunCmd(["fping", "-q", new_ip])
974       if not result.failed:
975         raise errors.OpPrereqError("The given cluster IP address (%s) is"
976                                    " reachable on the network. Aborting." %
977                                    new_ip)
978
979     self.op.name = new_name
980
981   def Exec(self, feedback_fn):
982     """Rename the cluster.
983
984     """
985     clustername = self.op.name
986     ip = self.ip
987     ss = self.sstore
988
989     # shutdown the master IP
990     master = ss.GetMasterNode()
991     if not rpc.call_node_stop_master(master):
992       raise errors.OpExecError("Could not disable the master role")
993
994     try:
995       # modify the sstore
996       ss.SetKey(ss.SS_MASTER_IP, ip)
997       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
998
999       # Distribute updated ss config to all nodes
1000       myself = self.cfg.GetNodeInfo(master)
1001       dist_nodes = self.cfg.GetNodeList()
1002       if myself.name in dist_nodes:
1003         dist_nodes.remove(myself.name)
1004
1005       logger.Debug("Copying updated ssconf data to all nodes")
1006       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1007         fname = ss.KeyToFilename(keyname)
1008         result = rpc.call_upload_file(dist_nodes, fname)
1009         for to_node in dist_nodes:
1010           if not result[to_node]:
1011             logger.Error("copy of file %s to node %s failed" %
1012                          (fname, to_node))
1013     finally:
1014       if not rpc.call_node_start_master(master):
1015         logger.Error("Could not re-enable the master role on the master,"
1016                      " please restart manually.")
1017
1018
1019 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1020   """Sleep and poll for an instance's disk to sync.
1021
1022   """
1023   if not instance.disks:
1024     return True
1025
1026   if not oneshot:
1027     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1028
1029   node = instance.primary_node
1030
1031   for dev in instance.disks:
1032     cfgw.SetDiskID(dev, node)
1033
1034   retries = 0
1035   while True:
1036     max_time = 0
1037     done = True
1038     cumul_degraded = False
1039     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1040     if not rstats:
1041       proc.LogWarning("Can't get any data from node %s" % node)
1042       retries += 1
1043       if retries >= 10:
1044         raise errors.RemoteError("Can't contact node %s for mirror data,"
1045                                  " aborting." % node)
1046       time.sleep(6)
1047       continue
1048     retries = 0
1049     for i in range(len(rstats)):
1050       mstat = rstats[i]
1051       if mstat is None:
1052         proc.LogWarning("Can't compute data for node %s/%s" %
1053                         (node, instance.disks[i].iv_name))
1054         continue
1055       # we ignore the ldisk parameter
1056       perc_done, est_time, is_degraded, _ = mstat
1057       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1058       if perc_done is not None:
1059         done = False
1060         if est_time is not None:
1061           rem_time = "%d estimated seconds remaining" % est_time
1062           max_time = est_time
1063         else:
1064           rem_time = "no time estimate"
1065         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1066                      (instance.disks[i].iv_name, perc_done, rem_time))
1067     if done or oneshot:
1068       break
1069
1070     if unlock:
1071       utils.Unlock('cmd')
1072     try:
1073       time.sleep(min(60, max_time))
1074     finally:
1075       if unlock:
1076         utils.Lock('cmd')
1077
1078   if done:
1079     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1080   return not cumul_degraded
1081
1082
1083 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1084   """Check that mirrors are not degraded.
1085
1086   The ldisk parameter, if True, will change the test from the
1087   is_degraded attribute (which represents overall non-ok status for
1088   the device(s)) to the ldisk (representing the local storage status).
1089
1090   """
1091   cfgw.SetDiskID(dev, node)
1092   if ldisk:
1093     idx = 6
1094   else:
1095     idx = 5
1096
1097   result = True
1098   if on_primary or dev.AssembleOnSecondary():
1099     rstats = rpc.call_blockdev_find(node, dev)
1100     if not rstats:
1101       logger.ToStderr("Can't get any data from node %s" % node)
1102       result = False
1103     else:
1104       result = result and (not rstats[idx])
1105   if dev.children:
1106     for child in dev.children:
1107       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1108
1109   return result
1110
1111
1112 class LUDiagnoseOS(NoHooksLU):
1113   """Logical unit for OS diagnose/query.
1114
1115   """
1116   _OP_REQP = []
1117
1118   def CheckPrereq(self):
1119     """Check prerequisites.
1120
1121     This always succeeds, since this is a pure query LU.
1122
1123     """
1124     return
1125
1126   def Exec(self, feedback_fn):
1127     """Compute the list of OSes.
1128
1129     """
1130     node_list = self.cfg.GetNodeList()
1131     node_data = rpc.call_os_diagnose(node_list)
1132     if node_data == False:
1133       raise errors.OpExecError("Can't gather the list of OSes")
1134     return node_data
1135
1136
1137 class LURemoveNode(LogicalUnit):
1138   """Logical unit for removing a node.
1139
1140   """
1141   HPATH = "node-remove"
1142   HTYPE = constants.HTYPE_NODE
1143   _OP_REQP = ["node_name"]
1144
1145   def BuildHooksEnv(self):
1146     """Build hooks env.
1147
1148     This doesn't run on the target node in the pre phase as a failed
1149     node would not allows itself to run.
1150
1151     """
1152     env = {
1153       "OP_TARGET": self.op.node_name,
1154       "NODE_NAME": self.op.node_name,
1155       }
1156     all_nodes = self.cfg.GetNodeList()
1157     all_nodes.remove(self.op.node_name)
1158     return env, all_nodes, all_nodes
1159
1160   def CheckPrereq(self):
1161     """Check prerequisites.
1162
1163     This checks:
1164      - the node exists in the configuration
1165      - it does not have primary or secondary instances
1166      - it's not the master
1167
1168     Any errors are signalled by raising errors.OpPrereqError.
1169
1170     """
1171     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1172     if node is None:
1173       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1174
1175     instance_list = self.cfg.GetInstanceList()
1176
1177     masternode = self.sstore.GetMasterNode()
1178     if node.name == masternode:
1179       raise errors.OpPrereqError("Node is the master node,"
1180                                  " you need to failover first.")
1181
1182     for instance_name in instance_list:
1183       instance = self.cfg.GetInstanceInfo(instance_name)
1184       if node.name == instance.primary_node:
1185         raise errors.OpPrereqError("Instance %s still running on the node,"
1186                                    " please remove first." % instance_name)
1187       if node.name in instance.secondary_nodes:
1188         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1189                                    " please remove first." % instance_name)
1190     self.op.node_name = node.name
1191     self.node = node
1192
1193   def Exec(self, feedback_fn):
1194     """Removes the node from the cluster.
1195
1196     """
1197     node = self.node
1198     logger.Info("stopping the node daemon and removing configs from node %s" %
1199                 node.name)
1200
1201     rpc.call_node_leave_cluster(node.name)
1202
1203     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1204
1205     logger.Info("Removing node %s from config" % node.name)
1206
1207     self.cfg.RemoveNode(node.name)
1208
1209     _RemoveHostFromEtcHosts(node.name)
1210
1211
1212 class LUQueryNodes(NoHooksLU):
1213   """Logical unit for querying nodes.
1214
1215   """
1216   _OP_REQP = ["output_fields", "names"]
1217
1218   def CheckPrereq(self):
1219     """Check prerequisites.
1220
1221     This checks that the fields required are valid output fields.
1222
1223     """
1224     self.dynamic_fields = frozenset(["dtotal", "dfree",
1225                                      "mtotal", "mnode", "mfree",
1226                                      "bootid"])
1227
1228     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1229                                "pinst_list", "sinst_list",
1230                                "pip", "sip"],
1231                        dynamic=self.dynamic_fields,
1232                        selected=self.op.output_fields)
1233
1234     self.wanted = _GetWantedNodes(self, self.op.names)
1235
1236   def Exec(self, feedback_fn):
1237     """Computes the list of nodes and their attributes.
1238
1239     """
1240     nodenames = self.wanted
1241     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1242
1243     # begin data gathering
1244
1245     if self.dynamic_fields.intersection(self.op.output_fields):
1246       live_data = {}
1247       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1248       for name in nodenames:
1249         nodeinfo = node_data.get(name, None)
1250         if nodeinfo:
1251           live_data[name] = {
1252             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1253             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1254             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1255             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1256             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1257             "bootid": nodeinfo['bootid'],
1258             }
1259         else:
1260           live_data[name] = {}
1261     else:
1262       live_data = dict.fromkeys(nodenames, {})
1263
1264     node_to_primary = dict([(name, set()) for name in nodenames])
1265     node_to_secondary = dict([(name, set()) for name in nodenames])
1266
1267     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1268                              "sinst_cnt", "sinst_list"))
1269     if inst_fields & frozenset(self.op.output_fields):
1270       instancelist = self.cfg.GetInstanceList()
1271
1272       for instance_name in instancelist:
1273         inst = self.cfg.GetInstanceInfo(instance_name)
1274         if inst.primary_node in node_to_primary:
1275           node_to_primary[inst.primary_node].add(inst.name)
1276         for secnode in inst.secondary_nodes:
1277           if secnode in node_to_secondary:
1278             node_to_secondary[secnode].add(inst.name)
1279
1280     # end data gathering
1281
1282     output = []
1283     for node in nodelist:
1284       node_output = []
1285       for field in self.op.output_fields:
1286         if field == "name":
1287           val = node.name
1288         elif field == "pinst_list":
1289           val = list(node_to_primary[node.name])
1290         elif field == "sinst_list":
1291           val = list(node_to_secondary[node.name])
1292         elif field == "pinst_cnt":
1293           val = len(node_to_primary[node.name])
1294         elif field == "sinst_cnt":
1295           val = len(node_to_secondary[node.name])
1296         elif field == "pip":
1297           val = node.primary_ip
1298         elif field == "sip":
1299           val = node.secondary_ip
1300         elif field in self.dynamic_fields:
1301           val = live_data[node.name].get(field, None)
1302         else:
1303           raise errors.ParameterError(field)
1304         node_output.append(val)
1305       output.append(node_output)
1306
1307     return output
1308
1309
1310 class LUQueryNodeVolumes(NoHooksLU):
1311   """Logical unit for getting volumes on node(s).
1312
1313   """
1314   _OP_REQP = ["nodes", "output_fields"]
1315
1316   def CheckPrereq(self):
1317     """Check prerequisites.
1318
1319     This checks that the fields required are valid output fields.
1320
1321     """
1322     self.nodes = _GetWantedNodes(self, self.op.nodes)
1323
1324     _CheckOutputFields(static=["node"],
1325                        dynamic=["phys", "vg", "name", "size", "instance"],
1326                        selected=self.op.output_fields)
1327
1328
1329   def Exec(self, feedback_fn):
1330     """Computes the list of nodes and their attributes.
1331
1332     """
1333     nodenames = self.nodes
1334     volumes = rpc.call_node_volumes(nodenames)
1335
1336     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1337              in self.cfg.GetInstanceList()]
1338
1339     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1340
1341     output = []
1342     for node in nodenames:
1343       if node not in volumes or not volumes[node]:
1344         continue
1345
1346       node_vols = volumes[node][:]
1347       node_vols.sort(key=lambda vol: vol['dev'])
1348
1349       for vol in node_vols:
1350         node_output = []
1351         for field in self.op.output_fields:
1352           if field == "node":
1353             val = node
1354           elif field == "phys":
1355             val = vol['dev']
1356           elif field == "vg":
1357             val = vol['vg']
1358           elif field == "name":
1359             val = vol['name']
1360           elif field == "size":
1361             val = int(float(vol['size']))
1362           elif field == "instance":
1363             for inst in ilist:
1364               if node not in lv_by_node[inst]:
1365                 continue
1366               if vol['name'] in lv_by_node[inst][node]:
1367                 val = inst.name
1368                 break
1369             else:
1370               val = '-'
1371           else:
1372             raise errors.ParameterError(field)
1373           node_output.append(str(val))
1374
1375         output.append(node_output)
1376
1377     return output
1378
1379
1380 class LUAddNode(LogicalUnit):
1381   """Logical unit for adding node to the cluster.
1382
1383   """
1384   HPATH = "node-add"
1385   HTYPE = constants.HTYPE_NODE
1386   _OP_REQP = ["node_name"]
1387
1388   def BuildHooksEnv(self):
1389     """Build hooks env.
1390
1391     This will run on all nodes before, and on all nodes + the new node after.
1392
1393     """
1394     env = {
1395       "OP_TARGET": self.op.node_name,
1396       "NODE_NAME": self.op.node_name,
1397       "NODE_PIP": self.op.primary_ip,
1398       "NODE_SIP": self.op.secondary_ip,
1399       }
1400     nodes_0 = self.cfg.GetNodeList()
1401     nodes_1 = nodes_0 + [self.op.node_name, ]
1402     return env, nodes_0, nodes_1
1403
1404   def CheckPrereq(self):
1405     """Check prerequisites.
1406
1407     This checks:
1408      - the new node is not already in the config
1409      - it is resolvable
1410      - its parameters (single/dual homed) matches the cluster
1411
1412     Any errors are signalled by raising errors.OpPrereqError.
1413
1414     """
1415     node_name = self.op.node_name
1416     cfg = self.cfg
1417
1418     dns_data = utils.HostInfo(node_name)
1419
1420     node = dns_data.name
1421     primary_ip = self.op.primary_ip = dns_data.ip
1422     secondary_ip = getattr(self.op, "secondary_ip", None)
1423     if secondary_ip is None:
1424       secondary_ip = primary_ip
1425     if not utils.IsValidIP(secondary_ip):
1426       raise errors.OpPrereqError("Invalid secondary IP given")
1427     self.op.secondary_ip = secondary_ip
1428     node_list = cfg.GetNodeList()
1429     if node in node_list:
1430       raise errors.OpPrereqError("Node %s is already in the configuration"
1431                                  % node)
1432
1433     for existing_node_name in node_list:
1434       existing_node = cfg.GetNodeInfo(existing_node_name)
1435       if (existing_node.primary_ip == primary_ip or
1436           existing_node.secondary_ip == primary_ip or
1437           existing_node.primary_ip == secondary_ip or
1438           existing_node.secondary_ip == secondary_ip):
1439         raise errors.OpPrereqError("New node ip address(es) conflict with"
1440                                    " existing node %s" % existing_node.name)
1441
1442     # check that the type of the node (single versus dual homed) is the
1443     # same as for the master
1444     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1445     master_singlehomed = myself.secondary_ip == myself.primary_ip
1446     newbie_singlehomed = secondary_ip == primary_ip
1447     if master_singlehomed != newbie_singlehomed:
1448       if master_singlehomed:
1449         raise errors.OpPrereqError("The master has no private ip but the"
1450                                    " new node has one")
1451       else:
1452         raise errors.OpPrereqError("The master has a private ip but the"
1453                                    " new node doesn't have one")
1454
1455     # checks reachablity
1456     if not utils.TcpPing(utils.HostInfo().name,
1457                          primary_ip,
1458                          constants.DEFAULT_NODED_PORT):
1459       raise errors.OpPrereqError("Node not reachable by ping")
1460
1461     if not newbie_singlehomed:
1462       # check reachability from my secondary ip to newbie's secondary ip
1463       if not utils.TcpPing(myself.secondary_ip,
1464                            secondary_ip,
1465                            constants.DEFAULT_NODED_PORT):
1466         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1467                                    " based ping to noded port")
1468
1469     self.new_node = objects.Node(name=node,
1470                                  primary_ip=primary_ip,
1471                                  secondary_ip=secondary_ip)
1472
1473   def Exec(self, feedback_fn):
1474     """Adds the new node to the cluster.
1475
1476     """
1477     new_node = self.new_node
1478     node = new_node.name
1479
1480     # set up inter-node password and certificate and restarts the node daemon
1481     gntpass = self.sstore.GetNodeDaemonPassword()
1482     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1483       raise errors.OpExecError("ganeti password corruption detected")
1484     f = open(constants.SSL_CERT_FILE)
1485     try:
1486       gntpem = f.read(8192)
1487     finally:
1488       f.close()
1489     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1490     # so we use this to detect an invalid certificate; as long as the
1491     # cert doesn't contain this, the here-document will be correctly
1492     # parsed by the shell sequence below
1493     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1494       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1495     if not gntpem.endswith("\n"):
1496       raise errors.OpExecError("PEM must end with newline")
1497     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1498
1499     # and then connect with ssh to set password and start ganeti-noded
1500     # note that all the below variables are sanitized at this point,
1501     # either by being constants or by the checks above
1502     ss = self.sstore
1503     mycommand = ("umask 077 && "
1504                  "echo '%s' > '%s' && "
1505                  "cat > '%s' << '!EOF.' && \n"
1506                  "%s!EOF.\n%s restart" %
1507                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1508                   constants.SSL_CERT_FILE, gntpem,
1509                   constants.NODE_INITD_SCRIPT))
1510
1511     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1512     if result.failed:
1513       raise errors.OpExecError("Remote command on node %s, error: %s,"
1514                                " output: %s" %
1515                                (node, result.fail_reason, result.output))
1516
1517     # check connectivity
1518     time.sleep(4)
1519
1520     result = rpc.call_version([node])[node]
1521     if result:
1522       if constants.PROTOCOL_VERSION == result:
1523         logger.Info("communication to node %s fine, sw version %s match" %
1524                     (node, result))
1525       else:
1526         raise errors.OpExecError("Version mismatch master version %s,"
1527                                  " node version %s" %
1528                                  (constants.PROTOCOL_VERSION, result))
1529     else:
1530       raise errors.OpExecError("Cannot get version from the new node")
1531
1532     # setup ssh on node
1533     logger.Info("copy ssh key to node %s" % node)
1534     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1535     keyarray = []
1536     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1537                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1538                 priv_key, pub_key]
1539
1540     for i in keyfiles:
1541       f = open(i, 'r')
1542       try:
1543         keyarray.append(f.read())
1544       finally:
1545         f.close()
1546
1547     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1548                                keyarray[3], keyarray[4], keyarray[5])
1549
1550     if not result:
1551       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1552
1553     # Add node to our /etc/hosts, and add key to known_hosts
1554     _AddHostToEtcHosts(new_node.name)
1555
1556     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1557                       self.cfg.GetHostKey())
1558
1559     if new_node.secondary_ip != new_node.primary_ip:
1560       if not rpc.call_node_tcp_ping(new_node.name,
1561                                     constants.LOCALHOST_IP_ADDRESS,
1562                                     new_node.secondary_ip,
1563                                     constants.DEFAULT_NODED_PORT,
1564                                     10, False):
1565         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1566                                  " you gave (%s). Please fix and re-run this"
1567                                  " command." % new_node.secondary_ip)
1568
1569     success, msg = ssh.VerifyNodeHostname(node)
1570     if not success:
1571       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1572                                " than the one the resolver gives: %s."
1573                                " Please fix and re-run this command." %
1574                                (node, msg))
1575
1576     # Distribute updated /etc/hosts and known_hosts to all nodes,
1577     # including the node just added
1578     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1579     dist_nodes = self.cfg.GetNodeList() + [node]
1580     if myself.name in dist_nodes:
1581       dist_nodes.remove(myself.name)
1582
1583     logger.Debug("Copying hosts and known_hosts to all nodes")
1584     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1585       result = rpc.call_upload_file(dist_nodes, fname)
1586       for to_node in dist_nodes:
1587         if not result[to_node]:
1588           logger.Error("copy of file %s to node %s failed" %
1589                        (fname, to_node))
1590
1591     to_copy = ss.GetFileList()
1592     for fname in to_copy:
1593       if not ssh.CopyFileToNode(node, fname):
1594         logger.Error("could not copy file %s to node %s" % (fname, node))
1595
1596     logger.Info("adding node %s to cluster.conf" % node)
1597     self.cfg.AddNode(new_node)
1598
1599
1600 class LUMasterFailover(LogicalUnit):
1601   """Failover the master node to the current node.
1602
1603   This is a special LU in that it must run on a non-master node.
1604
1605   """
1606   HPATH = "master-failover"
1607   HTYPE = constants.HTYPE_CLUSTER
1608   REQ_MASTER = False
1609   _OP_REQP = []
1610
1611   def BuildHooksEnv(self):
1612     """Build hooks env.
1613
1614     This will run on the new master only in the pre phase, and on all
1615     the nodes in the post phase.
1616
1617     """
1618     env = {
1619       "OP_TARGET": self.new_master,
1620       "NEW_MASTER": self.new_master,
1621       "OLD_MASTER": self.old_master,
1622       }
1623     return env, [self.new_master], self.cfg.GetNodeList()
1624
1625   def CheckPrereq(self):
1626     """Check prerequisites.
1627
1628     This checks that we are not already the master.
1629
1630     """
1631     self.new_master = utils.HostInfo().name
1632     self.old_master = self.sstore.GetMasterNode()
1633
1634     if self.old_master == self.new_master:
1635       raise errors.OpPrereqError("This commands must be run on the node"
1636                                  " where you want the new master to be."
1637                                  " %s is already the master" %
1638                                  self.old_master)
1639
1640   def Exec(self, feedback_fn):
1641     """Failover the master node.
1642
1643     This command, when run on a non-master node, will cause the current
1644     master to cease being master, and the non-master to become new
1645     master.
1646
1647     """
1648     #TODO: do not rely on gethostname returning the FQDN
1649     logger.Info("setting master to %s, old master: %s" %
1650                 (self.new_master, self.old_master))
1651
1652     if not rpc.call_node_stop_master(self.old_master):
1653       logger.Error("could disable the master role on the old master"
1654                    " %s, please disable manually" % self.old_master)
1655
1656     ss = self.sstore
1657     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1658     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1659                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1660       logger.Error("could not distribute the new simple store master file"
1661                    " to the other nodes, please check.")
1662
1663     if not rpc.call_node_start_master(self.new_master):
1664       logger.Error("could not start the master role on the new master"
1665                    " %s, please check" % self.new_master)
1666       feedback_fn("Error in activating the master IP on the new master,"
1667                   " please fix manually.")
1668
1669
1670
1671 class LUQueryClusterInfo(NoHooksLU):
1672   """Query cluster configuration.
1673
1674   """
1675   _OP_REQP = []
1676   REQ_MASTER = False
1677
1678   def CheckPrereq(self):
1679     """No prerequsites needed for this LU.
1680
1681     """
1682     pass
1683
1684   def Exec(self, feedback_fn):
1685     """Return cluster config.
1686
1687     """
1688     result = {
1689       "name": self.sstore.GetClusterName(),
1690       "software_version": constants.RELEASE_VERSION,
1691       "protocol_version": constants.PROTOCOL_VERSION,
1692       "config_version": constants.CONFIG_VERSION,
1693       "os_api_version": constants.OS_API_VERSION,
1694       "export_version": constants.EXPORT_VERSION,
1695       "master": self.sstore.GetMasterNode(),
1696       "architecture": (platform.architecture()[0], platform.machine()),
1697       }
1698
1699     return result
1700
1701
1702 class LUClusterCopyFile(NoHooksLU):
1703   """Copy file to cluster.
1704
1705   """
1706   _OP_REQP = ["nodes", "filename"]
1707
1708   def CheckPrereq(self):
1709     """Check prerequisites.
1710
1711     It should check that the named file exists and that the given list
1712     of nodes is valid.
1713
1714     """
1715     if not os.path.exists(self.op.filename):
1716       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1717
1718     self.nodes = _GetWantedNodes(self, self.op.nodes)
1719
1720   def Exec(self, feedback_fn):
1721     """Copy a file from master to some nodes.
1722
1723     Args:
1724       opts - class with options as members
1725       args - list containing a single element, the file name
1726     Opts used:
1727       nodes - list containing the name of target nodes; if empty, all nodes
1728
1729     """
1730     filename = self.op.filename
1731
1732     myname = utils.HostInfo().name
1733
1734     for node in self.nodes:
1735       if node == myname:
1736         continue
1737       if not ssh.CopyFileToNode(node, filename):
1738         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1739
1740
1741 class LUDumpClusterConfig(NoHooksLU):
1742   """Return a text-representation of the cluster-config.
1743
1744   """
1745   _OP_REQP = []
1746
1747   def CheckPrereq(self):
1748     """No prerequisites.
1749
1750     """
1751     pass
1752
1753   def Exec(self, feedback_fn):
1754     """Dump a representation of the cluster config to the standard output.
1755
1756     """
1757     return self.cfg.DumpConfig()
1758
1759
1760 class LURunClusterCommand(NoHooksLU):
1761   """Run a command on some nodes.
1762
1763   """
1764   _OP_REQP = ["command", "nodes"]
1765
1766   def CheckPrereq(self):
1767     """Check prerequisites.
1768
1769     It checks that the given list of nodes is valid.
1770
1771     """
1772     self.nodes = _GetWantedNodes(self, self.op.nodes)
1773
1774   def Exec(self, feedback_fn):
1775     """Run a command on some nodes.
1776
1777     """
1778     data = []
1779     for node in self.nodes:
1780       result = ssh.SSHCall(node, "root", self.op.command)
1781       data.append((node, result.output, result.exit_code))
1782
1783     return data
1784
1785
1786 class LUActivateInstanceDisks(NoHooksLU):
1787   """Bring up an instance's disks.
1788
1789   """
1790   _OP_REQP = ["instance_name"]
1791
1792   def CheckPrereq(self):
1793     """Check prerequisites.
1794
1795     This checks that the instance is in the cluster.
1796
1797     """
1798     instance = self.cfg.GetInstanceInfo(
1799       self.cfg.ExpandInstanceName(self.op.instance_name))
1800     if instance is None:
1801       raise errors.OpPrereqError("Instance '%s' not known" %
1802                                  self.op.instance_name)
1803     self.instance = instance
1804
1805
1806   def Exec(self, feedback_fn):
1807     """Activate the disks.
1808
1809     """
1810     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1811     if not disks_ok:
1812       raise errors.OpExecError("Cannot activate block devices")
1813
1814     return disks_info
1815
1816
1817 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1818   """Prepare the block devices for an instance.
1819
1820   This sets up the block devices on all nodes.
1821
1822   Args:
1823     instance: a ganeti.objects.Instance object
1824     ignore_secondaries: if true, errors on secondary nodes won't result
1825                         in an error return from the function
1826
1827   Returns:
1828     false if the operation failed
1829     list of (host, instance_visible_name, node_visible_name) if the operation
1830          suceeded with the mapping from node devices to instance devices
1831   """
1832   device_info = []
1833   disks_ok = True
1834   for inst_disk in instance.disks:
1835     master_result = None
1836     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1837       cfg.SetDiskID(node_disk, node)
1838       is_primary = node == instance.primary_node
1839       result = rpc.call_blockdev_assemble(node, node_disk,
1840                                           instance.name, is_primary)
1841       if not result:
1842         logger.Error("could not prepare block device %s on node %s"
1843                      " (is_primary=%s)" %
1844                      (inst_disk.iv_name, node, is_primary))
1845         if is_primary or not ignore_secondaries:
1846           disks_ok = False
1847       if is_primary:
1848         master_result = result
1849     device_info.append((instance.primary_node, inst_disk.iv_name,
1850                         master_result))
1851
1852   # leave the disks configured for the primary node
1853   # this is a workaround that would be fixed better by
1854   # improving the logical/physical id handling
1855   for disk in instance.disks:
1856     cfg.SetDiskID(disk, instance.primary_node)
1857
1858   return disks_ok, device_info
1859
1860
1861 def _StartInstanceDisks(cfg, instance, force):
1862   """Start the disks of an instance.
1863
1864   """
1865   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1866                                            ignore_secondaries=force)
1867   if not disks_ok:
1868     _ShutdownInstanceDisks(instance, cfg)
1869     if force is not None and not force:
1870       logger.Error("If the message above refers to a secondary node,"
1871                    " you can retry the operation using '--force'.")
1872     raise errors.OpExecError("Disk consistency error")
1873
1874
1875 class LUDeactivateInstanceDisks(NoHooksLU):
1876   """Shutdown an instance's disks.
1877
1878   """
1879   _OP_REQP = ["instance_name"]
1880
1881   def CheckPrereq(self):
1882     """Check prerequisites.
1883
1884     This checks that the instance is in the cluster.
1885
1886     """
1887     instance = self.cfg.GetInstanceInfo(
1888       self.cfg.ExpandInstanceName(self.op.instance_name))
1889     if instance is None:
1890       raise errors.OpPrereqError("Instance '%s' not known" %
1891                                  self.op.instance_name)
1892     self.instance = instance
1893
1894   def Exec(self, feedback_fn):
1895     """Deactivate the disks
1896
1897     """
1898     instance = self.instance
1899     ins_l = rpc.call_instance_list([instance.primary_node])
1900     ins_l = ins_l[instance.primary_node]
1901     if not type(ins_l) is list:
1902       raise errors.OpExecError("Can't contact node '%s'" %
1903                                instance.primary_node)
1904
1905     if self.instance.name in ins_l:
1906       raise errors.OpExecError("Instance is running, can't shutdown"
1907                                " block devices.")
1908
1909     _ShutdownInstanceDisks(instance, self.cfg)
1910
1911
1912 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1913   """Shutdown block devices of an instance.
1914
1915   This does the shutdown on all nodes of the instance.
1916
1917   If the ignore_primary is false, errors on the primary node are
1918   ignored.
1919
1920   """
1921   result = True
1922   for disk in instance.disks:
1923     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1924       cfg.SetDiskID(top_disk, node)
1925       if not rpc.call_blockdev_shutdown(node, top_disk):
1926         logger.Error("could not shutdown block device %s on node %s" %
1927                      (disk.iv_name, node))
1928         if not ignore_primary or node != instance.primary_node:
1929           result = False
1930   return result
1931
1932
1933 class LUStartupInstance(LogicalUnit):
1934   """Starts an instance.
1935
1936   """
1937   HPATH = "instance-start"
1938   HTYPE = constants.HTYPE_INSTANCE
1939   _OP_REQP = ["instance_name", "force"]
1940
1941   def BuildHooksEnv(self):
1942     """Build hooks env.
1943
1944     This runs on master, primary and secondary nodes of the instance.
1945
1946     """
1947     env = {
1948       "FORCE": self.op.force,
1949       }
1950     env.update(_BuildInstanceHookEnvByObject(self.instance))
1951     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1952           list(self.instance.secondary_nodes))
1953     return env, nl, nl
1954
1955   def CheckPrereq(self):
1956     """Check prerequisites.
1957
1958     This checks that the instance is in the cluster.
1959
1960     """
1961     instance = self.cfg.GetInstanceInfo(
1962       self.cfg.ExpandInstanceName(self.op.instance_name))
1963     if instance is None:
1964       raise errors.OpPrereqError("Instance '%s' not known" %
1965                                  self.op.instance_name)
1966
1967     # check bridges existance
1968     _CheckInstanceBridgesExist(instance)
1969
1970     self.instance = instance
1971     self.op.instance_name = instance.name
1972
1973   def Exec(self, feedback_fn):
1974     """Start the instance.
1975
1976     """
1977     instance = self.instance
1978     force = self.op.force
1979     extra_args = getattr(self.op, "extra_args", "")
1980
1981     node_current = instance.primary_node
1982
1983     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1984     if not nodeinfo:
1985       raise errors.OpExecError("Could not contact node %s for infos" %
1986                                (node_current))
1987
1988     freememory = nodeinfo[node_current]['memory_free']
1989     memory = instance.memory
1990     if memory > freememory:
1991       raise errors.OpExecError("Not enough memory to start instance"
1992                                " %s on node %s"
1993                                " needed %s MiB, available %s MiB" %
1994                                (instance.name, node_current, memory,
1995                                 freememory))
1996
1997     _StartInstanceDisks(self.cfg, instance, force)
1998
1999     if not rpc.call_instance_start(node_current, instance, extra_args):
2000       _ShutdownInstanceDisks(instance, self.cfg)
2001       raise errors.OpExecError("Could not start instance")
2002
2003     self.cfg.MarkInstanceUp(instance.name)
2004
2005
2006 class LURebootInstance(LogicalUnit):
2007   """Reboot an instance.
2008
2009   """
2010   HPATH = "instance-reboot"
2011   HTYPE = constants.HTYPE_INSTANCE
2012   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2013
2014   def BuildHooksEnv(self):
2015     """Build hooks env.
2016
2017     This runs on master, primary and secondary nodes of the instance.
2018
2019     """
2020     env = {
2021       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2022       }
2023     env.update(_BuildInstanceHookEnvByObject(self.instance))
2024     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2025           list(self.instance.secondary_nodes))
2026     return env, nl, nl
2027
2028   def CheckPrereq(self):
2029     """Check prerequisites.
2030
2031     This checks that the instance is in the cluster.
2032
2033     """
2034     instance = self.cfg.GetInstanceInfo(
2035       self.cfg.ExpandInstanceName(self.op.instance_name))
2036     if instance is None:
2037       raise errors.OpPrereqError("Instance '%s' not known" %
2038                                  self.op.instance_name)
2039
2040     # check bridges existance
2041     _CheckInstanceBridgesExist(instance)
2042
2043     self.instance = instance
2044     self.op.instance_name = instance.name
2045
2046   def Exec(self, feedback_fn):
2047     """Reboot the instance.
2048
2049     """
2050     instance = self.instance
2051     ignore_secondaries = self.op.ignore_secondaries
2052     reboot_type = self.op.reboot_type
2053     extra_args = getattr(self.op, "extra_args", "")
2054
2055     node_current = instance.primary_node
2056
2057     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2058                            constants.INSTANCE_REBOOT_HARD,
2059                            constants.INSTANCE_REBOOT_FULL]:
2060       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2061                                   (constants.INSTANCE_REBOOT_SOFT,
2062                                    constants.INSTANCE_REBOOT_HARD,
2063                                    constants.INSTANCE_REBOOT_FULL))
2064
2065     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2066                        constants.INSTANCE_REBOOT_HARD]:
2067       if not rpc.call_instance_reboot(node_current, instance,
2068                                       reboot_type, extra_args):
2069         raise errors.OpExecError("Could not reboot instance")
2070     else:
2071       if not rpc.call_instance_shutdown(node_current, instance):
2072         raise errors.OpExecError("could not shutdown instance for full reboot")
2073       _ShutdownInstanceDisks(instance, self.cfg)
2074       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2075       if not rpc.call_instance_start(node_current, instance, extra_args):
2076         _ShutdownInstanceDisks(instance, self.cfg)
2077         raise errors.OpExecError("Could not start instance for full reboot")
2078
2079     self.cfg.MarkInstanceUp(instance.name)
2080
2081
2082 class LUShutdownInstance(LogicalUnit):
2083   """Shutdown an instance.
2084
2085   """
2086   HPATH = "instance-stop"
2087   HTYPE = constants.HTYPE_INSTANCE
2088   _OP_REQP = ["instance_name"]
2089
2090   def BuildHooksEnv(self):
2091     """Build hooks env.
2092
2093     This runs on master, primary and secondary nodes of the instance.
2094
2095     """
2096     env = _BuildInstanceHookEnvByObject(self.instance)
2097     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2098           list(self.instance.secondary_nodes))
2099     return env, nl, nl
2100
2101   def CheckPrereq(self):
2102     """Check prerequisites.
2103
2104     This checks that the instance is in the cluster.
2105
2106     """
2107     instance = self.cfg.GetInstanceInfo(
2108       self.cfg.ExpandInstanceName(self.op.instance_name))
2109     if instance is None:
2110       raise errors.OpPrereqError("Instance '%s' not known" %
2111                                  self.op.instance_name)
2112     self.instance = instance
2113
2114   def Exec(self, feedback_fn):
2115     """Shutdown the instance.
2116
2117     """
2118     instance = self.instance
2119     node_current = instance.primary_node
2120     if not rpc.call_instance_shutdown(node_current, instance):
2121       logger.Error("could not shutdown instance")
2122
2123     self.cfg.MarkInstanceDown(instance.name)
2124     _ShutdownInstanceDisks(instance, self.cfg)
2125
2126
2127 class LUReinstallInstance(LogicalUnit):
2128   """Reinstall an instance.
2129
2130   """
2131   HPATH = "instance-reinstall"
2132   HTYPE = constants.HTYPE_INSTANCE
2133   _OP_REQP = ["instance_name"]
2134
2135   def BuildHooksEnv(self):
2136     """Build hooks env.
2137
2138     This runs on master, primary and secondary nodes of the instance.
2139
2140     """
2141     env = _BuildInstanceHookEnvByObject(self.instance)
2142     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2143           list(self.instance.secondary_nodes))
2144     return env, nl, nl
2145
2146   def CheckPrereq(self):
2147     """Check prerequisites.
2148
2149     This checks that the instance is in the cluster and is not running.
2150
2151     """
2152     instance = self.cfg.GetInstanceInfo(
2153       self.cfg.ExpandInstanceName(self.op.instance_name))
2154     if instance is None:
2155       raise errors.OpPrereqError("Instance '%s' not known" %
2156                                  self.op.instance_name)
2157     if instance.disk_template == constants.DT_DISKLESS:
2158       raise errors.OpPrereqError("Instance '%s' has no disks" %
2159                                  self.op.instance_name)
2160     if instance.status != "down":
2161       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2162                                  self.op.instance_name)
2163     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2164     if remote_info:
2165       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2166                                  (self.op.instance_name,
2167                                   instance.primary_node))
2168
2169     self.op.os_type = getattr(self.op, "os_type", None)
2170     if self.op.os_type is not None:
2171       # OS verification
2172       pnode = self.cfg.GetNodeInfo(
2173         self.cfg.ExpandNodeName(instance.primary_node))
2174       if pnode is None:
2175         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2176                                    self.op.pnode)
2177       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2178       if not os_obj:
2179         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2180                                    " primary node"  % self.op.os_type)
2181
2182     self.instance = instance
2183
2184   def Exec(self, feedback_fn):
2185     """Reinstall the instance.
2186
2187     """
2188     inst = self.instance
2189
2190     if self.op.os_type is not None:
2191       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2192       inst.os = self.op.os_type
2193       self.cfg.AddInstance(inst)
2194
2195     _StartInstanceDisks(self.cfg, inst, None)
2196     try:
2197       feedback_fn("Running the instance OS create scripts...")
2198       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2199         raise errors.OpExecError("Could not install OS for instance %s"
2200                                  " on node %s" %
2201                                  (inst.name, inst.primary_node))
2202     finally:
2203       _ShutdownInstanceDisks(inst, self.cfg)
2204
2205
2206 class LURenameInstance(LogicalUnit):
2207   """Rename an instance.
2208
2209   """
2210   HPATH = "instance-rename"
2211   HTYPE = constants.HTYPE_INSTANCE
2212   _OP_REQP = ["instance_name", "new_name"]
2213
2214   def BuildHooksEnv(self):
2215     """Build hooks env.
2216
2217     This runs on master, primary and secondary nodes of the instance.
2218
2219     """
2220     env = _BuildInstanceHookEnvByObject(self.instance)
2221     env["INSTANCE_NEW_NAME"] = self.op.new_name
2222     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2223           list(self.instance.secondary_nodes))
2224     return env, nl, nl
2225
2226   def CheckPrereq(self):
2227     """Check prerequisites.
2228
2229     This checks that the instance is in the cluster and is not running.
2230
2231     """
2232     instance = self.cfg.GetInstanceInfo(
2233       self.cfg.ExpandInstanceName(self.op.instance_name))
2234     if instance is None:
2235       raise errors.OpPrereqError("Instance '%s' not known" %
2236                                  self.op.instance_name)
2237     if instance.status != "down":
2238       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2239                                  self.op.instance_name)
2240     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2241     if remote_info:
2242       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2243                                  (self.op.instance_name,
2244                                   instance.primary_node))
2245     self.instance = instance
2246
2247     # new name verification
2248     name_info = utils.HostInfo(self.op.new_name)
2249
2250     self.op.new_name = new_name = name_info.name
2251     if not getattr(self.op, "ignore_ip", False):
2252       command = ["fping", "-q", name_info.ip]
2253       result = utils.RunCmd(command)
2254       if not result.failed:
2255         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2256                                    (name_info.ip, new_name))
2257
2258
2259   def Exec(self, feedback_fn):
2260     """Reinstall the instance.
2261
2262     """
2263     inst = self.instance
2264     old_name = inst.name
2265
2266     self.cfg.RenameInstance(inst.name, self.op.new_name)
2267
2268     # re-read the instance from the configuration after rename
2269     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2270
2271     _StartInstanceDisks(self.cfg, inst, None)
2272     try:
2273       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2274                                           "sda", "sdb"):
2275         msg = ("Could run OS rename script for instance %s on node %s (but the"
2276                " instance has been renamed in Ganeti)" %
2277                (inst.name, inst.primary_node))
2278         logger.Error(msg)
2279     finally:
2280       _ShutdownInstanceDisks(inst, self.cfg)
2281
2282
2283 class LURemoveInstance(LogicalUnit):
2284   """Remove an instance.
2285
2286   """
2287   HPATH = "instance-remove"
2288   HTYPE = constants.HTYPE_INSTANCE
2289   _OP_REQP = ["instance_name"]
2290
2291   def BuildHooksEnv(self):
2292     """Build hooks env.
2293
2294     This runs on master, primary and secondary nodes of the instance.
2295
2296     """
2297     env = _BuildInstanceHookEnvByObject(self.instance)
2298     nl = [self.sstore.GetMasterNode()]
2299     return env, nl, nl
2300
2301   def CheckPrereq(self):
2302     """Check prerequisites.
2303
2304     This checks that the instance is in the cluster.
2305
2306     """
2307     instance = self.cfg.GetInstanceInfo(
2308       self.cfg.ExpandInstanceName(self.op.instance_name))
2309     if instance is None:
2310       raise errors.OpPrereqError("Instance '%s' not known" %
2311                                  self.op.instance_name)
2312     self.instance = instance
2313
2314   def Exec(self, feedback_fn):
2315     """Remove the instance.
2316
2317     """
2318     instance = self.instance
2319     logger.Info("shutting down instance %s on node %s" %
2320                 (instance.name, instance.primary_node))
2321
2322     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2323       if self.op.ignore_failures:
2324         feedback_fn("Warning: can't shutdown instance")
2325       else:
2326         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2327                                  (instance.name, instance.primary_node))
2328
2329     logger.Info("removing block devices for instance %s" % instance.name)
2330
2331     if not _RemoveDisks(instance, self.cfg):
2332       if self.op.ignore_failures:
2333         feedback_fn("Warning: can't remove instance's disks")
2334       else:
2335         raise errors.OpExecError("Can't remove instance's disks")
2336
2337     logger.Info("removing instance %s out of cluster config" % instance.name)
2338
2339     self.cfg.RemoveInstance(instance.name)
2340
2341
2342 class LUQueryInstances(NoHooksLU):
2343   """Logical unit for querying instances.
2344
2345   """
2346   _OP_REQP = ["output_fields", "names"]
2347
2348   def CheckPrereq(self):
2349     """Check prerequisites.
2350
2351     This checks that the fields required are valid output fields.
2352
2353     """
2354     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2355     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2356                                "admin_state", "admin_ram",
2357                                "disk_template", "ip", "mac", "bridge",
2358                                "sda_size", "sdb_size"],
2359                        dynamic=self.dynamic_fields,
2360                        selected=self.op.output_fields)
2361
2362     self.wanted = _GetWantedInstances(self, self.op.names)
2363
2364   def Exec(self, feedback_fn):
2365     """Computes the list of nodes and their attributes.
2366
2367     """
2368     instance_names = self.wanted
2369     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2370                      in instance_names]
2371
2372     # begin data gathering
2373
2374     nodes = frozenset([inst.primary_node for inst in instance_list])
2375
2376     bad_nodes = []
2377     if self.dynamic_fields.intersection(self.op.output_fields):
2378       live_data = {}
2379       node_data = rpc.call_all_instances_info(nodes)
2380       for name in nodes:
2381         result = node_data[name]
2382         if result:
2383           live_data.update(result)
2384         elif result == False:
2385           bad_nodes.append(name)
2386         # else no instance is alive
2387     else:
2388       live_data = dict([(name, {}) for name in instance_names])
2389
2390     # end data gathering
2391
2392     output = []
2393     for instance in instance_list:
2394       iout = []
2395       for field in self.op.output_fields:
2396         if field == "name":
2397           val = instance.name
2398         elif field == "os":
2399           val = instance.os
2400         elif field == "pnode":
2401           val = instance.primary_node
2402         elif field == "snodes":
2403           val = list(instance.secondary_nodes)
2404         elif field == "admin_state":
2405           val = (instance.status != "down")
2406         elif field == "oper_state":
2407           if instance.primary_node in bad_nodes:
2408             val = None
2409           else:
2410             val = bool(live_data.get(instance.name))
2411         elif field == "admin_ram":
2412           val = instance.memory
2413         elif field == "oper_ram":
2414           if instance.primary_node in bad_nodes:
2415             val = None
2416           elif instance.name in live_data:
2417             val = live_data[instance.name].get("memory", "?")
2418           else:
2419             val = "-"
2420         elif field == "disk_template":
2421           val = instance.disk_template
2422         elif field == "ip":
2423           val = instance.nics[0].ip
2424         elif field == "bridge":
2425           val = instance.nics[0].bridge
2426         elif field == "mac":
2427           val = instance.nics[0].mac
2428         elif field == "sda_size" or field == "sdb_size":
2429           disk = instance.FindDisk(field[:3])
2430           if disk is None:
2431             val = None
2432           else:
2433             val = disk.size
2434         else:
2435           raise errors.ParameterError(field)
2436         iout.append(val)
2437       output.append(iout)
2438
2439     return output
2440
2441
2442 class LUFailoverInstance(LogicalUnit):
2443   """Failover an instance.
2444
2445   """
2446   HPATH = "instance-failover"
2447   HTYPE = constants.HTYPE_INSTANCE
2448   _OP_REQP = ["instance_name", "ignore_consistency"]
2449
2450   def BuildHooksEnv(self):
2451     """Build hooks env.
2452
2453     This runs on master, primary and secondary nodes of the instance.
2454
2455     """
2456     env = {
2457       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2458       }
2459     env.update(_BuildInstanceHookEnvByObject(self.instance))
2460     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2461     return env, nl, nl
2462
2463   def CheckPrereq(self):
2464     """Check prerequisites.
2465
2466     This checks that the instance is in the cluster.
2467
2468     """
2469     instance = self.cfg.GetInstanceInfo(
2470       self.cfg.ExpandInstanceName(self.op.instance_name))
2471     if instance is None:
2472       raise errors.OpPrereqError("Instance '%s' not known" %
2473                                  self.op.instance_name)
2474
2475     if instance.disk_template not in constants.DTS_NET_MIRROR:
2476       raise errors.OpPrereqError("Instance's disk layout is not"
2477                                  " network mirrored, cannot failover.")
2478
2479     secondary_nodes = instance.secondary_nodes
2480     if not secondary_nodes:
2481       raise errors.ProgrammerError("no secondary node but using "
2482                                    "DT_REMOTE_RAID1 template")
2483
2484     # check memory requirements on the secondary node
2485     target_node = secondary_nodes[0]
2486     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2487     info = nodeinfo.get(target_node, None)
2488     if not info:
2489       raise errors.OpPrereqError("Cannot get current information"
2490                                  " from node '%s'" % nodeinfo)
2491     if instance.memory > info['memory_free']:
2492       raise errors.OpPrereqError("Not enough memory on target node %s."
2493                                  " %d MB available, %d MB required" %
2494                                  (target_node, info['memory_free'],
2495                                   instance.memory))
2496
2497     # check bridge existance
2498     brlist = [nic.bridge for nic in instance.nics]
2499     if not rpc.call_bridges_exist(target_node, brlist):
2500       raise errors.OpPrereqError("One or more target bridges %s does not"
2501                                  " exist on destination node '%s'" %
2502                                  (brlist, target_node))
2503
2504     self.instance = instance
2505
2506   def Exec(self, feedback_fn):
2507     """Failover an instance.
2508
2509     The failover is done by shutting it down on its present node and
2510     starting it on the secondary.
2511
2512     """
2513     instance = self.instance
2514
2515     source_node = instance.primary_node
2516     target_node = instance.secondary_nodes[0]
2517
2518     feedback_fn("* checking disk consistency between source and target")
2519     for dev in instance.disks:
2520       # for remote_raid1, these are md over drbd
2521       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2522         if not self.op.ignore_consistency:
2523           raise errors.OpExecError("Disk %s is degraded on target node,"
2524                                    " aborting failover." % dev.iv_name)
2525
2526     feedback_fn("* checking target node resource availability")
2527     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2528
2529     if not nodeinfo:
2530       raise errors.OpExecError("Could not contact target node %s." %
2531                                target_node)
2532
2533     free_memory = int(nodeinfo[target_node]['memory_free'])
2534     memory = instance.memory
2535     if memory > free_memory:
2536       raise errors.OpExecError("Not enough memory to create instance %s on"
2537                                " node %s. needed %s MiB, available %s MiB" %
2538                                (instance.name, target_node, memory,
2539                                 free_memory))
2540
2541     feedback_fn("* shutting down instance on source node")
2542     logger.Info("Shutting down instance %s on node %s" %
2543                 (instance.name, source_node))
2544
2545     if not rpc.call_instance_shutdown(source_node, instance):
2546       if self.op.ignore_consistency:
2547         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2548                      " anyway. Please make sure node %s is down"  %
2549                      (instance.name, source_node, source_node))
2550       else:
2551         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2552                                  (instance.name, source_node))
2553
2554     feedback_fn("* deactivating the instance's disks on source node")
2555     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2556       raise errors.OpExecError("Can't shut down the instance's disks.")
2557
2558     instance.primary_node = target_node
2559     # distribute new instance config to the other nodes
2560     self.cfg.AddInstance(instance)
2561
2562     feedback_fn("* activating the instance's disks on target node")
2563     logger.Info("Starting instance %s on node %s" %
2564                 (instance.name, target_node))
2565
2566     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2567                                              ignore_secondaries=True)
2568     if not disks_ok:
2569       _ShutdownInstanceDisks(instance, self.cfg)
2570       raise errors.OpExecError("Can't activate the instance's disks")
2571
2572     feedback_fn("* starting the instance on the target node")
2573     if not rpc.call_instance_start(target_node, instance, None):
2574       _ShutdownInstanceDisks(instance, self.cfg)
2575       raise errors.OpExecError("Could not start instance %s on node %s." %
2576                                (instance.name, target_node))
2577
2578
2579 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2580   """Create a tree of block devices on the primary node.
2581
2582   This always creates all devices.
2583
2584   """
2585   if device.children:
2586     for child in device.children:
2587       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2588         return False
2589
2590   cfg.SetDiskID(device, node)
2591   new_id = rpc.call_blockdev_create(node, device, device.size,
2592                                     instance.name, True, info)
2593   if not new_id:
2594     return False
2595   if device.physical_id is None:
2596     device.physical_id = new_id
2597   return True
2598
2599
2600 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2601   """Create a tree of block devices on a secondary node.
2602
2603   If this device type has to be created on secondaries, create it and
2604   all its children.
2605
2606   If not, just recurse to children keeping the same 'force' value.
2607
2608   """
2609   if device.CreateOnSecondary():
2610     force = True
2611   if device.children:
2612     for child in device.children:
2613       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2614                                         child, force, info):
2615         return False
2616
2617   if not force:
2618     return True
2619   cfg.SetDiskID(device, node)
2620   new_id = rpc.call_blockdev_create(node, device, device.size,
2621                                     instance.name, False, info)
2622   if not new_id:
2623     return False
2624   if device.physical_id is None:
2625     device.physical_id = new_id
2626   return True
2627
2628
2629 def _GenerateUniqueNames(cfg, exts):
2630   """Generate a suitable LV name.
2631
2632   This will generate a logical volume name for the given instance.
2633
2634   """
2635   results = []
2636   for val in exts:
2637     new_id = cfg.GenerateUniqueID()
2638     results.append("%s%s" % (new_id, val))
2639   return results
2640
2641
2642 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2643   """Generate a drbd device complete with its children.
2644
2645   """
2646   port = cfg.AllocatePort()
2647   vgname = cfg.GetVGName()
2648   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2649                           logical_id=(vgname, names[0]))
2650   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2651                           logical_id=(vgname, names[1]))
2652   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2653                           logical_id = (primary, secondary, port),
2654                           children = [dev_data, dev_meta])
2655   return drbd_dev
2656
2657
2658 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2659   """Generate a drbd8 device complete with its children.
2660
2661   """
2662   port = cfg.AllocatePort()
2663   vgname = cfg.GetVGName()
2664   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2665                           logical_id=(vgname, names[0]))
2666   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2667                           logical_id=(vgname, names[1]))
2668   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2669                           logical_id = (primary, secondary, port),
2670                           children = [dev_data, dev_meta],
2671                           iv_name=iv_name)
2672   return drbd_dev
2673
2674 def _GenerateDiskTemplate(cfg, template_name,
2675                           instance_name, primary_node,
2676                           secondary_nodes, disk_sz, swap_sz):
2677   """Generate the entire disk layout for a given template type.
2678
2679   """
2680   #TODO: compute space requirements
2681
2682   vgname = cfg.GetVGName()
2683   if template_name == "diskless":
2684     disks = []
2685   elif template_name == "plain":
2686     if len(secondary_nodes) != 0:
2687       raise errors.ProgrammerError("Wrong template configuration")
2688
2689     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2690     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2691                            logical_id=(vgname, names[0]),
2692                            iv_name = "sda")
2693     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2694                            logical_id=(vgname, names[1]),
2695                            iv_name = "sdb")
2696     disks = [sda_dev, sdb_dev]
2697   elif template_name == "local_raid1":
2698     if len(secondary_nodes) != 0:
2699       raise errors.ProgrammerError("Wrong template configuration")
2700
2701
2702     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2703                                        ".sdb_m1", ".sdb_m2"])
2704     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2705                               logical_id=(vgname, names[0]))
2706     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2707                               logical_id=(vgname, names[1]))
2708     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2709                               size=disk_sz,
2710                               children = [sda_dev_m1, sda_dev_m2])
2711     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2712                               logical_id=(vgname, names[2]))
2713     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2714                               logical_id=(vgname, names[3]))
2715     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2716                               size=swap_sz,
2717                               children = [sdb_dev_m1, sdb_dev_m2])
2718     disks = [md_sda_dev, md_sdb_dev]
2719   elif template_name == constants.DT_REMOTE_RAID1:
2720     if len(secondary_nodes) != 1:
2721       raise errors.ProgrammerError("Wrong template configuration")
2722     remote_node = secondary_nodes[0]
2723     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2724                                        ".sdb_data", ".sdb_meta"])
2725     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2726                                          disk_sz, names[0:2])
2727     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2728                               children = [drbd_sda_dev], size=disk_sz)
2729     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2730                                          swap_sz, names[2:4])
2731     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2732                               children = [drbd_sdb_dev], size=swap_sz)
2733     disks = [md_sda_dev, md_sdb_dev]
2734   elif template_name == constants.DT_DRBD8:
2735     if len(secondary_nodes) != 1:
2736       raise errors.ProgrammerError("Wrong template configuration")
2737     remote_node = secondary_nodes[0]
2738     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2739                                        ".sdb_data", ".sdb_meta"])
2740     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2741                                          disk_sz, names[0:2], "sda")
2742     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2743                                          swap_sz, names[2:4], "sdb")
2744     disks = [drbd_sda_dev, drbd_sdb_dev]
2745   else:
2746     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2747   return disks
2748
2749
2750 def _GetInstanceInfoText(instance):
2751   """Compute that text that should be added to the disk's metadata.
2752
2753   """
2754   return "originstname+%s" % instance.name
2755
2756
2757 def _CreateDisks(cfg, instance):
2758   """Create all disks for an instance.
2759
2760   This abstracts away some work from AddInstance.
2761
2762   Args:
2763     instance: the instance object
2764
2765   Returns:
2766     True or False showing the success of the creation process
2767
2768   """
2769   info = _GetInstanceInfoText(instance)
2770
2771   for device in instance.disks:
2772     logger.Info("creating volume %s for instance %s" %
2773               (device.iv_name, instance.name))
2774     #HARDCODE
2775     for secondary_node in instance.secondary_nodes:
2776       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2777                                         device, False, info):
2778         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2779                      (device.iv_name, device, secondary_node))
2780         return False
2781     #HARDCODE
2782     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2783                                     instance, device, info):
2784       logger.Error("failed to create volume %s on primary!" %
2785                    device.iv_name)
2786       return False
2787   return True
2788
2789
2790 def _RemoveDisks(instance, cfg):
2791   """Remove all disks for an instance.
2792
2793   This abstracts away some work from `AddInstance()` and
2794   `RemoveInstance()`. Note that in case some of the devices couldn't
2795   be removed, the removal will continue with the other ones (compare
2796   with `_CreateDisks()`).
2797
2798   Args:
2799     instance: the instance object
2800
2801   Returns:
2802     True or False showing the success of the removal proces
2803
2804   """
2805   logger.Info("removing block devices for instance %s" % instance.name)
2806
2807   result = True
2808   for device in instance.disks:
2809     for node, disk in device.ComputeNodeTree(instance.primary_node):
2810       cfg.SetDiskID(disk, node)
2811       if not rpc.call_blockdev_remove(node, disk):
2812         logger.Error("could not remove block device %s on node %s,"
2813                      " continuing anyway" %
2814                      (device.iv_name, node))
2815         result = False
2816   return result
2817
2818
2819 class LUCreateInstance(LogicalUnit):
2820   """Create an instance.
2821
2822   """
2823   HPATH = "instance-add"
2824   HTYPE = constants.HTYPE_INSTANCE
2825   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2826               "disk_template", "swap_size", "mode", "start", "vcpus",
2827               "wait_for_sync", "ip_check"]
2828
2829   def BuildHooksEnv(self):
2830     """Build hooks env.
2831
2832     This runs on master, primary and secondary nodes of the instance.
2833
2834     """
2835     env = {
2836       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2837       "INSTANCE_DISK_SIZE": self.op.disk_size,
2838       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2839       "INSTANCE_ADD_MODE": self.op.mode,
2840       }
2841     if self.op.mode == constants.INSTANCE_IMPORT:
2842       env["INSTANCE_SRC_NODE"] = self.op.src_node
2843       env["INSTANCE_SRC_PATH"] = self.op.src_path
2844       env["INSTANCE_SRC_IMAGE"] = self.src_image
2845
2846     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2847       primary_node=self.op.pnode,
2848       secondary_nodes=self.secondaries,
2849       status=self.instance_status,
2850       os_type=self.op.os_type,
2851       memory=self.op.mem_size,
2852       vcpus=self.op.vcpus,
2853       nics=[(self.inst_ip, self.op.bridge)],
2854     ))
2855
2856     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2857           self.secondaries)
2858     return env, nl, nl
2859
2860
2861   def CheckPrereq(self):
2862     """Check prerequisites.
2863
2864     """
2865     if self.op.mode not in (constants.INSTANCE_CREATE,
2866                             constants.INSTANCE_IMPORT):
2867       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2868                                  self.op.mode)
2869
2870     if self.op.mode == constants.INSTANCE_IMPORT:
2871       src_node = getattr(self.op, "src_node", None)
2872       src_path = getattr(self.op, "src_path", None)
2873       if src_node is None or src_path is None:
2874         raise errors.OpPrereqError("Importing an instance requires source"
2875                                    " node and path options")
2876       src_node_full = self.cfg.ExpandNodeName(src_node)
2877       if src_node_full is None:
2878         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2879       self.op.src_node = src_node = src_node_full
2880
2881       if not os.path.isabs(src_path):
2882         raise errors.OpPrereqError("The source path must be absolute")
2883
2884       export_info = rpc.call_export_info(src_node, src_path)
2885
2886       if not export_info:
2887         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2888
2889       if not export_info.has_section(constants.INISECT_EXP):
2890         raise errors.ProgrammerError("Corrupted export config")
2891
2892       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2893       if (int(ei_version) != constants.EXPORT_VERSION):
2894         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2895                                    (ei_version, constants.EXPORT_VERSION))
2896
2897       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2898         raise errors.OpPrereqError("Can't import instance with more than"
2899                                    " one data disk")
2900
2901       # FIXME: are the old os-es, disk sizes, etc. useful?
2902       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2903       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2904                                                          'disk0_dump'))
2905       self.src_image = diskimage
2906     else: # INSTANCE_CREATE
2907       if getattr(self.op, "os_type", None) is None:
2908         raise errors.OpPrereqError("No guest OS specified")
2909
2910     # check primary node
2911     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2912     if pnode is None:
2913       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2914                                  self.op.pnode)
2915     self.op.pnode = pnode.name
2916     self.pnode = pnode
2917     self.secondaries = []
2918     # disk template and mirror node verification
2919     if self.op.disk_template not in constants.DISK_TEMPLATES:
2920       raise errors.OpPrereqError("Invalid disk template name")
2921
2922     if self.op.disk_template in constants.DTS_NET_MIRROR:
2923       if getattr(self.op, "snode", None) is None:
2924         raise errors.OpPrereqError("The networked disk templates need"
2925                                    " a mirror node")
2926
2927       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2928       if snode_name is None:
2929         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2930                                    self.op.snode)
2931       elif snode_name == pnode.name:
2932         raise errors.OpPrereqError("The secondary node cannot be"
2933                                    " the primary node.")
2934       self.secondaries.append(snode_name)
2935
2936     # Check lv size requirements
2937     nodenames = [pnode.name] + self.secondaries
2938     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2939
2940     # Required free disk space as a function of disk and swap space
2941     req_size_dict = {
2942       constants.DT_DISKLESS: 0,
2943       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2944       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2945       # 256 MB are added for drbd metadata, 128MB for each drbd device
2946       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2947       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2948     }
2949
2950     if self.op.disk_template not in req_size_dict:
2951       raise errors.ProgrammerError("Disk template '%s' size requirement"
2952                                    " is unknown" %  self.op.disk_template)
2953
2954     req_size = req_size_dict[self.op.disk_template]
2955
2956     for node in nodenames:
2957       info = nodeinfo.get(node, None)
2958       if not info:
2959         raise errors.OpPrereqError("Cannot get current information"
2960                                    " from node '%s'" % nodeinfo)
2961       if req_size > info['vg_free']:
2962         raise errors.OpPrereqError("Not enough disk space on target node %s."
2963                                    " %d MB available, %d MB required" %
2964                                    (node, info['vg_free'], req_size))
2965
2966     # os verification
2967     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2968     if not os_obj:
2969       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2970                                  " primary node"  % self.op.os_type)
2971
2972     # instance verification
2973     hostname1 = utils.HostInfo(self.op.instance_name)
2974
2975     self.op.instance_name = instance_name = hostname1.name
2976     instance_list = self.cfg.GetInstanceList()
2977     if instance_name in instance_list:
2978       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2979                                  instance_name)
2980
2981     ip = getattr(self.op, "ip", None)
2982     if ip is None or ip.lower() == "none":
2983       inst_ip = None
2984     elif ip.lower() == "auto":
2985       inst_ip = hostname1.ip
2986     else:
2987       if not utils.IsValidIP(ip):
2988         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2989                                    " like a valid IP" % ip)
2990       inst_ip = ip
2991     self.inst_ip = inst_ip
2992
2993     if self.op.start and not self.op.ip_check:
2994       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2995                                  " adding an instance in start mode")
2996
2997     if self.op.ip_check:
2998       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2999                        constants.DEFAULT_NODED_PORT):
3000         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3001                                    (hostname1.ip, instance_name))
3002
3003     # bridge verification
3004     bridge = getattr(self.op, "bridge", None)
3005     if bridge is None:
3006       self.op.bridge = self.cfg.GetDefBridge()
3007     else:
3008       self.op.bridge = bridge
3009
3010     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3011       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3012                                  " destination node '%s'" %
3013                                  (self.op.bridge, pnode.name))
3014
3015     if self.op.start:
3016       self.instance_status = 'up'
3017     else:
3018       self.instance_status = 'down'
3019
3020   def Exec(self, feedback_fn):
3021     """Create and add the instance to the cluster.
3022
3023     """
3024     instance = self.op.instance_name
3025     pnode_name = self.pnode.name
3026
3027     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3028     if self.inst_ip is not None:
3029       nic.ip = self.inst_ip
3030
3031     network_port = None  # placeholder assignment for later
3032
3033     disks = _GenerateDiskTemplate(self.cfg,
3034                                   self.op.disk_template,
3035                                   instance, pnode_name,
3036                                   self.secondaries, self.op.disk_size,
3037                                   self.op.swap_size)
3038
3039     iobj = objects.Instance(name=instance, os=self.op.os_type,
3040                             primary_node=pnode_name,
3041                             memory=self.op.mem_size,
3042                             vcpus=self.op.vcpus,
3043                             nics=[nic], disks=disks,
3044                             disk_template=self.op.disk_template,
3045                             status=self.instance_status,
3046                             network_port=network_port,
3047                             )
3048
3049     feedback_fn("* creating instance disks...")
3050     if not _CreateDisks(self.cfg, iobj):
3051       _RemoveDisks(iobj, self.cfg)
3052       raise errors.OpExecError("Device creation failed, reverting...")
3053
3054     feedback_fn("adding instance %s to cluster config" % instance)
3055
3056     self.cfg.AddInstance(iobj)
3057
3058     if self.op.wait_for_sync:
3059       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3060     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3061       # make sure the disks are not degraded (still sync-ing is ok)
3062       time.sleep(15)
3063       feedback_fn("* checking mirrors status")
3064       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3065     else:
3066       disk_abort = False
3067
3068     if disk_abort:
3069       _RemoveDisks(iobj, self.cfg)
3070       self.cfg.RemoveInstance(iobj.name)
3071       raise errors.OpExecError("There are some degraded disks for"
3072                                " this instance")
3073
3074     feedback_fn("creating os for instance %s on node %s" %
3075                 (instance, pnode_name))
3076
3077     if iobj.disk_template != constants.DT_DISKLESS:
3078       if self.op.mode == constants.INSTANCE_CREATE:
3079         feedback_fn("* running the instance OS create scripts...")
3080         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3081           raise errors.OpExecError("could not add os for instance %s"
3082                                    " on node %s" %
3083                                    (instance, pnode_name))
3084
3085       elif self.op.mode == constants.INSTANCE_IMPORT:
3086         feedback_fn("* running the instance OS import scripts...")
3087         src_node = self.op.src_node
3088         src_image = self.src_image
3089         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3090                                                 src_node, src_image):
3091           raise errors.OpExecError("Could not import os for instance"
3092                                    " %s on node %s" %
3093                                    (instance, pnode_name))
3094       else:
3095         # also checked in the prereq part
3096         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3097                                      % self.op.mode)
3098
3099     if self.op.start:
3100       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3101       feedback_fn("* starting instance...")
3102       if not rpc.call_instance_start(pnode_name, iobj, None):
3103         raise errors.OpExecError("Could not start instance")
3104
3105
3106 class LUConnectConsole(NoHooksLU):
3107   """Connect to an instance's console.
3108
3109   This is somewhat special in that it returns the command line that
3110   you need to run on the master node in order to connect to the
3111   console.
3112
3113   """
3114   _OP_REQP = ["instance_name"]
3115
3116   def CheckPrereq(self):
3117     """Check prerequisites.
3118
3119     This checks that the instance is in the cluster.
3120
3121     """
3122     instance = self.cfg.GetInstanceInfo(
3123       self.cfg.ExpandInstanceName(self.op.instance_name))
3124     if instance is None:
3125       raise errors.OpPrereqError("Instance '%s' not known" %
3126                                  self.op.instance_name)
3127     self.instance = instance
3128
3129   def Exec(self, feedback_fn):
3130     """Connect to the console of an instance
3131
3132     """
3133     instance = self.instance
3134     node = instance.primary_node
3135
3136     node_insts = rpc.call_instance_list([node])[node]
3137     if node_insts is False:
3138       raise errors.OpExecError("Can't connect to node %s." % node)
3139
3140     if instance.name not in node_insts:
3141       raise errors.OpExecError("Instance %s is not running." % instance.name)
3142
3143     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3144
3145     hyper = hypervisor.GetHypervisor()
3146     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3147     # build ssh cmdline
3148     argv = ["ssh", "-q", "-t"]
3149     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3150     argv.extend(ssh.BATCH_MODE_OPTS)
3151     argv.append(node)
3152     argv.append(console_cmd)
3153     return "ssh", argv
3154
3155
3156 class LUAddMDDRBDComponent(LogicalUnit):
3157   """Adda new mirror member to an instance's disk.
3158
3159   """
3160   HPATH = "mirror-add"
3161   HTYPE = constants.HTYPE_INSTANCE
3162   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3163
3164   def BuildHooksEnv(self):
3165     """Build hooks env.
3166
3167     This runs on the master, the primary and all the secondaries.
3168
3169     """
3170     env = {
3171       "NEW_SECONDARY": self.op.remote_node,
3172       "DISK_NAME": self.op.disk_name,
3173       }
3174     env.update(_BuildInstanceHookEnvByObject(self.instance))
3175     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3176           self.op.remote_node,] + list(self.instance.secondary_nodes)
3177     return env, nl, nl
3178
3179   def CheckPrereq(self):
3180     """Check prerequisites.
3181
3182     This checks that the instance is in the cluster.
3183
3184     """
3185     instance = self.cfg.GetInstanceInfo(
3186       self.cfg.ExpandInstanceName(self.op.instance_name))
3187     if instance is None:
3188       raise errors.OpPrereqError("Instance '%s' not known" %
3189                                  self.op.instance_name)
3190     self.instance = instance
3191
3192     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3193     if remote_node is None:
3194       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3195     self.remote_node = remote_node
3196
3197     if remote_node == instance.primary_node:
3198       raise errors.OpPrereqError("The specified node is the primary node of"
3199                                  " the instance.")
3200
3201     if instance.disk_template != constants.DT_REMOTE_RAID1:
3202       raise errors.OpPrereqError("Instance's disk layout is not"
3203                                  " remote_raid1.")
3204     for disk in instance.disks:
3205       if disk.iv_name == self.op.disk_name:
3206         break
3207     else:
3208       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3209                                  " instance." % self.op.disk_name)
3210     if len(disk.children) > 1:
3211       raise errors.OpPrereqError("The device already has two slave devices."
3212                                  " This would create a 3-disk raid1 which we"
3213                                  " don't allow.")
3214     self.disk = disk
3215
3216   def Exec(self, feedback_fn):
3217     """Add the mirror component
3218
3219     """
3220     disk = self.disk
3221     instance = self.instance
3222
3223     remote_node = self.remote_node
3224     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3225     names = _GenerateUniqueNames(self.cfg, lv_names)
3226     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3227                                      remote_node, disk.size, names)
3228
3229     logger.Info("adding new mirror component on secondary")
3230     #HARDCODE
3231     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3232                                       new_drbd, False,
3233                                       _GetInstanceInfoText(instance)):
3234       raise errors.OpExecError("Failed to create new component on secondary"
3235                                " node %s" % remote_node)
3236
3237     logger.Info("adding new mirror component on primary")
3238     #HARDCODE
3239     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3240                                     instance, new_drbd,
3241                                     _GetInstanceInfoText(instance)):
3242       # remove secondary dev
3243       self.cfg.SetDiskID(new_drbd, remote_node)
3244       rpc.call_blockdev_remove(remote_node, new_drbd)
3245       raise errors.OpExecError("Failed to create volume on primary")
3246
3247     # the device exists now
3248     # call the primary node to add the mirror to md
3249     logger.Info("adding new mirror component to md")
3250     if not rpc.call_blockdev_addchildren(instance.primary_node,
3251                                          disk, [new_drbd]):
3252       logger.Error("Can't add mirror compoment to md!")
3253       self.cfg.SetDiskID(new_drbd, remote_node)
3254       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3255         logger.Error("Can't rollback on secondary")
3256       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3257       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3258         logger.Error("Can't rollback on primary")
3259       raise errors.OpExecError("Can't add mirror component to md array")
3260
3261     disk.children.append(new_drbd)
3262
3263     self.cfg.AddInstance(instance)
3264
3265     _WaitForSync(self.cfg, instance, self.proc)
3266
3267     return 0
3268
3269
3270 class LURemoveMDDRBDComponent(LogicalUnit):
3271   """Remove a component from a remote_raid1 disk.
3272
3273   """
3274   HPATH = "mirror-remove"
3275   HTYPE = constants.HTYPE_INSTANCE
3276   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3277
3278   def BuildHooksEnv(self):
3279     """Build hooks env.
3280
3281     This runs on the master, the primary and all the secondaries.
3282
3283     """
3284     env = {
3285       "DISK_NAME": self.op.disk_name,
3286       "DISK_ID": self.op.disk_id,
3287       "OLD_SECONDARY": self.old_secondary,
3288       }
3289     env.update(_BuildInstanceHookEnvByObject(self.instance))
3290     nl = [self.sstore.GetMasterNode(),
3291           self.instance.primary_node] + list(self.instance.secondary_nodes)
3292     return env, nl, nl
3293
3294   def CheckPrereq(self):
3295     """Check prerequisites.
3296
3297     This checks that the instance is in the cluster.
3298
3299     """
3300     instance = self.cfg.GetInstanceInfo(
3301       self.cfg.ExpandInstanceName(self.op.instance_name))
3302     if instance is None:
3303       raise errors.OpPrereqError("Instance '%s' not known" %
3304                                  self.op.instance_name)
3305     self.instance = instance
3306
3307     if instance.disk_template != constants.DT_REMOTE_RAID1:
3308       raise errors.OpPrereqError("Instance's disk layout is not"
3309                                  " remote_raid1.")
3310     for disk in instance.disks:
3311       if disk.iv_name == self.op.disk_name:
3312         break
3313     else:
3314       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3315                                  " instance." % self.op.disk_name)
3316     for child in disk.children:
3317       if (child.dev_type == constants.LD_DRBD7 and
3318           child.logical_id[2] == self.op.disk_id):
3319         break
3320     else:
3321       raise errors.OpPrereqError("Can't find the device with this port.")
3322
3323     if len(disk.children) < 2:
3324       raise errors.OpPrereqError("Cannot remove the last component from"
3325                                  " a mirror.")
3326     self.disk = disk
3327     self.child = child
3328     if self.child.logical_id[0] == instance.primary_node:
3329       oid = 1
3330     else:
3331       oid = 0
3332     self.old_secondary = self.child.logical_id[oid]
3333
3334   def Exec(self, feedback_fn):
3335     """Remove the mirror component
3336
3337     """
3338     instance = self.instance
3339     disk = self.disk
3340     child = self.child
3341     logger.Info("remove mirror component")
3342     self.cfg.SetDiskID(disk, instance.primary_node)
3343     if not rpc.call_blockdev_removechildren(instance.primary_node,
3344                                             disk, [child]):
3345       raise errors.OpExecError("Can't remove child from mirror.")
3346
3347     for node in child.logical_id[:2]:
3348       self.cfg.SetDiskID(child, node)
3349       if not rpc.call_blockdev_remove(node, child):
3350         logger.Error("Warning: failed to remove device from node %s,"
3351                      " continuing operation." % node)
3352
3353     disk.children.remove(child)
3354     self.cfg.AddInstance(instance)
3355
3356
3357 class LUReplaceDisks(LogicalUnit):
3358   """Replace the disks of an instance.
3359
3360   """
3361   HPATH = "mirrors-replace"
3362   HTYPE = constants.HTYPE_INSTANCE
3363   _OP_REQP = ["instance_name", "mode", "disks"]
3364
3365   def BuildHooksEnv(self):
3366     """Build hooks env.
3367
3368     This runs on the master, the primary and all the secondaries.
3369
3370     """
3371     env = {
3372       "MODE": self.op.mode,
3373       "NEW_SECONDARY": self.op.remote_node,
3374       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3375       }
3376     env.update(_BuildInstanceHookEnvByObject(self.instance))
3377     nl = [
3378       self.sstore.GetMasterNode(),
3379       self.instance.primary_node,
3380       ]
3381     if self.op.remote_node is not None:
3382       nl.append(self.op.remote_node)
3383     return env, nl, nl
3384
3385   def CheckPrereq(self):
3386     """Check prerequisites.
3387
3388     This checks that the instance is in the cluster.
3389
3390     """
3391     instance = self.cfg.GetInstanceInfo(
3392       self.cfg.ExpandInstanceName(self.op.instance_name))
3393     if instance is None:
3394       raise errors.OpPrereqError("Instance '%s' not known" %
3395                                  self.op.instance_name)
3396     self.instance = instance
3397     self.op.instance_name = instance.name
3398
3399     if instance.disk_template not in constants.DTS_NET_MIRROR:
3400       raise errors.OpPrereqError("Instance's disk layout is not"
3401                                  " network mirrored.")
3402
3403     if len(instance.secondary_nodes) != 1:
3404       raise errors.OpPrereqError("The instance has a strange layout,"
3405                                  " expected one secondary but found %d" %
3406                                  len(instance.secondary_nodes))
3407
3408     self.sec_node = instance.secondary_nodes[0]
3409
3410     remote_node = getattr(self.op, "remote_node", None)
3411     if remote_node is not None:
3412       remote_node = self.cfg.ExpandNodeName(remote_node)
3413       if remote_node is None:
3414         raise errors.OpPrereqError("Node '%s' not known" %
3415                                    self.op.remote_node)
3416       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3417     else:
3418       self.remote_node_info = None
3419     if remote_node == instance.primary_node:
3420       raise errors.OpPrereqError("The specified node is the primary node of"
3421                                  " the instance.")
3422     elif remote_node == self.sec_node:
3423       if self.op.mode == constants.REPLACE_DISK_SEC:
3424         # this is for DRBD8, where we can't execute the same mode of
3425         # replacement as for drbd7 (no different port allocated)
3426         raise errors.OpPrereqError("Same secondary given, cannot execute"
3427                                    " replacement")
3428       # the user gave the current secondary, switch to
3429       # 'no-replace-secondary' mode for drbd7
3430       remote_node = None
3431     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3432         self.op.mode != constants.REPLACE_DISK_ALL):
3433       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3434                                  " disks replacement, not individual ones")
3435     if instance.disk_template == constants.DT_DRBD8:
3436       if (self.op.mode == constants.REPLACE_DISK_ALL and
3437           remote_node is not None):
3438         # switch to replace secondary mode
3439         self.op.mode = constants.REPLACE_DISK_SEC
3440
3441       if self.op.mode == constants.REPLACE_DISK_ALL:
3442         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3443                                    " secondary disk replacement, not"
3444                                    " both at once")
3445       elif self.op.mode == constants.REPLACE_DISK_PRI:
3446         if remote_node is not None:
3447           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3448                                      " the secondary while doing a primary"
3449                                      " node disk replacement")
3450         self.tgt_node = instance.primary_node
3451         self.oth_node = instance.secondary_nodes[0]
3452       elif self.op.mode == constants.REPLACE_DISK_SEC:
3453         self.new_node = remote_node # this can be None, in which case
3454                                     # we don't change the secondary
3455         self.tgt_node = instance.secondary_nodes[0]
3456         self.oth_node = instance.primary_node
3457       else:
3458         raise errors.ProgrammerError("Unhandled disk replace mode")
3459
3460     for name in self.op.disks:
3461       if instance.FindDisk(name) is None:
3462         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3463                                    (name, instance.name))
3464     self.op.remote_node = remote_node
3465
3466   def _ExecRR1(self, feedback_fn):
3467     """Replace the disks of an instance.
3468
3469     """
3470     instance = self.instance
3471     iv_names = {}
3472     # start of work
3473     if self.op.remote_node is None:
3474       remote_node = self.sec_node
3475     else:
3476       remote_node = self.op.remote_node
3477     cfg = self.cfg
3478     for dev in instance.disks:
3479       size = dev.size
3480       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3481       names = _GenerateUniqueNames(cfg, lv_names)
3482       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3483                                        remote_node, size, names)
3484       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3485       logger.Info("adding new mirror component on secondary for %s" %
3486                   dev.iv_name)
3487       #HARDCODE
3488       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3489                                         new_drbd, False,
3490                                         _GetInstanceInfoText(instance)):
3491         raise errors.OpExecError("Failed to create new component on secondary"
3492                                  " node %s. Full abort, cleanup manually!" %
3493                                  remote_node)
3494
3495       logger.Info("adding new mirror component on primary")
3496       #HARDCODE
3497       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3498                                       instance, new_drbd,
3499                                       _GetInstanceInfoText(instance)):
3500         # remove secondary dev
3501         cfg.SetDiskID(new_drbd, remote_node)
3502         rpc.call_blockdev_remove(remote_node, new_drbd)
3503         raise errors.OpExecError("Failed to create volume on primary!"
3504                                  " Full abort, cleanup manually!!")
3505
3506       # the device exists now
3507       # call the primary node to add the mirror to md
3508       logger.Info("adding new mirror component to md")
3509       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3510                                            [new_drbd]):
3511         logger.Error("Can't add mirror compoment to md!")
3512         cfg.SetDiskID(new_drbd, remote_node)
3513         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3514           logger.Error("Can't rollback on secondary")
3515         cfg.SetDiskID(new_drbd, instance.primary_node)
3516         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3517           logger.Error("Can't rollback on primary")
3518         raise errors.OpExecError("Full abort, cleanup manually!!")
3519
3520       dev.children.append(new_drbd)
3521       cfg.AddInstance(instance)
3522
3523     # this can fail as the old devices are degraded and _WaitForSync
3524     # does a combined result over all disks, so we don't check its
3525     # return value
3526     _WaitForSync(cfg, instance, self.proc, unlock=True)
3527
3528     # so check manually all the devices
3529     for name in iv_names:
3530       dev, child, new_drbd = iv_names[name]
3531       cfg.SetDiskID(dev, instance.primary_node)
3532       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3533       if is_degr:
3534         raise errors.OpExecError("MD device %s is degraded!" % name)
3535       cfg.SetDiskID(new_drbd, instance.primary_node)
3536       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3537       if is_degr:
3538         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3539
3540     for name in iv_names:
3541       dev, child, new_drbd = iv_names[name]
3542       logger.Info("remove mirror %s component" % name)
3543       cfg.SetDiskID(dev, instance.primary_node)
3544       if not rpc.call_blockdev_removechildren(instance.primary_node,
3545                                               dev, [child]):
3546         logger.Error("Can't remove child from mirror, aborting"
3547                      " *this device cleanup*.\nYou need to cleanup manually!!")
3548         continue
3549
3550       for node in child.logical_id[:2]:
3551         logger.Info("remove child device on %s" % node)
3552         cfg.SetDiskID(child, node)
3553         if not rpc.call_blockdev_remove(node, child):
3554           logger.Error("Warning: failed to remove device from node %s,"
3555                        " continuing operation." % node)
3556
3557       dev.children.remove(child)
3558
3559       cfg.AddInstance(instance)
3560
3561   def _ExecD8DiskOnly(self, feedback_fn):
3562     """Replace a disk on the primary or secondary for dbrd8.
3563
3564     The algorithm for replace is quite complicated:
3565       - for each disk to be replaced:
3566         - create new LVs on the target node with unique names
3567         - detach old LVs from the drbd device
3568         - rename old LVs to name_replaced.<time_t>
3569         - rename new LVs to old LVs
3570         - attach the new LVs (with the old names now) to the drbd device
3571       - wait for sync across all devices
3572       - for each modified disk:
3573         - remove old LVs (which have the name name_replaces.<time_t>)
3574
3575     Failures are not very well handled.
3576
3577     """
3578     steps_total = 6
3579     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3580     instance = self.instance
3581     iv_names = {}
3582     vgname = self.cfg.GetVGName()
3583     # start of work
3584     cfg = self.cfg
3585     tgt_node = self.tgt_node
3586     oth_node = self.oth_node
3587
3588     # Step: check device activation
3589     self.proc.LogStep(1, steps_total, "check device existence")
3590     info("checking volume groups")
3591     my_vg = cfg.GetVGName()
3592     results = rpc.call_vg_list([oth_node, tgt_node])
3593     if not results:
3594       raise errors.OpExecError("Can't list volume groups on the nodes")
3595     for node in oth_node, tgt_node:
3596       res = results.get(node, False)
3597       if not res or my_vg not in res:
3598         raise errors.OpExecError("Volume group '%s' not found on %s" %
3599                                  (my_vg, node))
3600     for dev in instance.disks:
3601       if not dev.iv_name in self.op.disks:
3602         continue
3603       for node in tgt_node, oth_node:
3604         info("checking %s on %s" % (dev.iv_name, node))
3605         cfg.SetDiskID(dev, node)
3606         if not rpc.call_blockdev_find(node, dev):
3607           raise errors.OpExecError("Can't find device %s on node %s" %
3608                                    (dev.iv_name, node))
3609
3610     # Step: check other node consistency
3611     self.proc.LogStep(2, steps_total, "check peer consistency")
3612     for dev in instance.disks:
3613       if not dev.iv_name in self.op.disks:
3614         continue
3615       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3616       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3617                                    oth_node==instance.primary_node):
3618         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3619                                  " to replace disks on this node (%s)" %
3620                                  (oth_node, tgt_node))
3621
3622     # Step: create new storage
3623     self.proc.LogStep(3, steps_total, "allocate new storage")
3624     for dev in instance.disks:
3625       if not dev.iv_name in self.op.disks:
3626         continue
3627       size = dev.size
3628       cfg.SetDiskID(dev, tgt_node)
3629       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3630       names = _GenerateUniqueNames(cfg, lv_names)
3631       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3632                              logical_id=(vgname, names[0]))
3633       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3634                              logical_id=(vgname, names[1]))
3635       new_lvs = [lv_data, lv_meta]
3636       old_lvs = dev.children
3637       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3638       info("creating new local storage on %s for %s" %
3639            (tgt_node, dev.iv_name))
3640       # since we *always* want to create this LV, we use the
3641       # _Create...OnPrimary (which forces the creation), even if we
3642       # are talking about the secondary node
3643       for new_lv in new_lvs:
3644         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3645                                         _GetInstanceInfoText(instance)):
3646           raise errors.OpExecError("Failed to create new LV named '%s' on"
3647                                    " node '%s'" %
3648                                    (new_lv.logical_id[1], tgt_node))
3649
3650     # Step: for each lv, detach+rename*2+attach
3651     self.proc.LogStep(4, steps_total, "change drbd configuration")
3652     for dev, old_lvs, new_lvs in iv_names.itervalues():
3653       info("detaching %s drbd from local storage" % dev.iv_name)
3654       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3655         raise errors.OpExecError("Can't detach drbd from local storage on node"
3656                                  " %s for device %s" % (tgt_node, dev.iv_name))
3657       #dev.children = []
3658       #cfg.Update(instance)
3659
3660       # ok, we created the new LVs, so now we know we have the needed
3661       # storage; as such, we proceed on the target node to rename
3662       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3663       # using the assumption than logical_id == physical_id (which in
3664       # turn is the unique_id on that node)
3665
3666       # FIXME(iustin): use a better name for the replaced LVs
3667       temp_suffix = int(time.time())
3668       ren_fn = lambda d, suff: (d.physical_id[0],
3669                                 d.physical_id[1] + "_replaced-%s" % suff)
3670       # build the rename list based on what LVs exist on the node
3671       rlist = []
3672       for to_ren in old_lvs:
3673         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3674         if find_res is not None: # device exists
3675           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3676
3677       info("renaming the old LVs on the target node")
3678       if not rpc.call_blockdev_rename(tgt_node, rlist):
3679         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3680       # now we rename the new LVs to the old LVs
3681       info("renaming the new LVs on the target node")
3682       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3683       if not rpc.call_blockdev_rename(tgt_node, rlist):
3684         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3685
3686       for old, new in zip(old_lvs, new_lvs):
3687         new.logical_id = old.logical_id
3688         cfg.SetDiskID(new, tgt_node)
3689
3690       for disk in old_lvs:
3691         disk.logical_id = ren_fn(disk, temp_suffix)
3692         cfg.SetDiskID(disk, tgt_node)
3693
3694       # now that the new lvs have the old name, we can add them to the device
3695       info("adding new mirror component on %s" % tgt_node)
3696       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3697         for new_lv in new_lvs:
3698           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3699             warning("Can't rollback device %s", "manually cleanup unused"
3700                     " logical volumes")
3701         raise errors.OpExecError("Can't add local storage to drbd")
3702
3703       dev.children = new_lvs
3704       cfg.Update(instance)
3705
3706     # Step: wait for sync
3707
3708     # this can fail as the old devices are degraded and _WaitForSync
3709     # does a combined result over all disks, so we don't check its
3710     # return value
3711     self.proc.LogStep(5, steps_total, "sync devices")
3712     _WaitForSync(cfg, instance, self.proc, unlock=True)
3713
3714     # so check manually all the devices
3715     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3716       cfg.SetDiskID(dev, instance.primary_node)
3717       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3718       if is_degr:
3719         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3720
3721     # Step: remove old storage
3722     self.proc.LogStep(6, steps_total, "removing old storage")
3723     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724       info("remove logical volumes for %s" % name)
3725       for lv in old_lvs:
3726         cfg.SetDiskID(lv, tgt_node)
3727         if not rpc.call_blockdev_remove(tgt_node, lv):
3728           warning("Can't remove old LV", "manually remove unused LVs")
3729           continue
3730
3731   def _ExecD8Secondary(self, feedback_fn):
3732     """Replace the secondary node for drbd8.
3733
3734     The algorithm for replace is quite complicated:
3735       - for all disks of the instance:
3736         - create new LVs on the new node with same names
3737         - shutdown the drbd device on the old secondary
3738         - disconnect the drbd network on the primary
3739         - create the drbd device on the new secondary
3740         - network attach the drbd on the primary, using an artifice:
3741           the drbd code for Attach() will connect to the network if it
3742           finds a device which is connected to the good local disks but
3743           not network enabled
3744       - wait for sync across all devices
3745       - remove all disks from the old secondary
3746
3747     Failures are not very well handled.
3748
3749     """
3750     steps_total = 6
3751     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3752     instance = self.instance
3753     iv_names = {}
3754     vgname = self.cfg.GetVGName()
3755     # start of work
3756     cfg = self.cfg
3757     old_node = self.tgt_node
3758     new_node = self.new_node
3759     pri_node = instance.primary_node
3760
3761     # Step: check device activation
3762     self.proc.LogStep(1, steps_total, "check device existence")
3763     info("checking volume groups")
3764     my_vg = cfg.GetVGName()
3765     results = rpc.call_vg_list([pri_node, new_node])
3766     if not results:
3767       raise errors.OpExecError("Can't list volume groups on the nodes")
3768     for node in pri_node, new_node:
3769       res = results.get(node, False)
3770       if not res or my_vg not in res:
3771         raise errors.OpExecError("Volume group '%s' not found on %s" %
3772                                  (my_vg, node))
3773     for dev in instance.disks:
3774       if not dev.iv_name in self.op.disks:
3775         continue
3776       info("checking %s on %s" % (dev.iv_name, pri_node))
3777       cfg.SetDiskID(dev, pri_node)
3778       if not rpc.call_blockdev_find(pri_node, dev):
3779         raise errors.OpExecError("Can't find device %s on node %s" %
3780                                  (dev.iv_name, pri_node))
3781
3782     # Step: check other node consistency
3783     self.proc.LogStep(2, steps_total, "check peer consistency")
3784     for dev in instance.disks:
3785       if not dev.iv_name in self.op.disks:
3786         continue
3787       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3788       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3789         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3790                                  " unsafe to replace the secondary" %
3791                                  pri_node)
3792
3793     # Step: create new storage
3794     self.proc.LogStep(3, steps_total, "allocate new storage")
3795     for dev in instance.disks:
3796       size = dev.size
3797       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3798       # since we *always* want to create this LV, we use the
3799       # _Create...OnPrimary (which forces the creation), even if we
3800       # are talking about the secondary node
3801       for new_lv in dev.children:
3802         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3803                                         _GetInstanceInfoText(instance)):
3804           raise errors.OpExecError("Failed to create new LV named '%s' on"
3805                                    " node '%s'" %
3806                                    (new_lv.logical_id[1], new_node))
3807
3808       iv_names[dev.iv_name] = (dev, dev.children)
3809
3810     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3811     for dev in instance.disks:
3812       size = dev.size
3813       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3814       # create new devices on new_node
3815       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3816                               logical_id=(pri_node, new_node,
3817                                           dev.logical_id[2]),
3818                               children=dev.children)
3819       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3820                                         new_drbd, False,
3821                                       _GetInstanceInfoText(instance)):
3822         raise errors.OpExecError("Failed to create new DRBD on"
3823                                  " node '%s'" % new_node)
3824
3825     for dev in instance.disks:
3826       # we have new devices, shutdown the drbd on the old secondary
3827       info("shutting down drbd for %s on old node" % dev.iv_name)
3828       cfg.SetDiskID(dev, old_node)
3829       if not rpc.call_blockdev_shutdown(old_node, dev):
3830         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3831                 "Please cleanup this device manually as soon as possible")
3832
3833     info("detaching primary drbds from the network (=> standalone)")
3834     done = 0
3835     for dev in instance.disks:
3836       cfg.SetDiskID(dev, pri_node)
3837       # set the physical (unique in bdev terms) id to None, meaning
3838       # detach from network
3839       dev.physical_id = (None,) * len(dev.physical_id)
3840       # and 'find' the device, which will 'fix' it to match the
3841       # standalone state
3842       if rpc.call_blockdev_find(pri_node, dev):
3843         done += 1
3844       else:
3845         warning("Failed to detach drbd %s from network, unusual case" %
3846                 dev.iv_name)
3847
3848     if not done:
3849       # no detaches succeeded (very unlikely)
3850       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3851
3852     # if we managed to detach at least one, we update all the disks of
3853     # the instance to point to the new secondary
3854     info("updating instance configuration")
3855     for dev in instance.disks:
3856       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3857       cfg.SetDiskID(dev, pri_node)
3858     cfg.Update(instance)
3859
3860     # and now perform the drbd attach
3861     info("attaching primary drbds to new secondary (standalone => connected)")
3862     failures = []
3863     for dev in instance.disks:
3864       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3865       # since the attach is smart, it's enough to 'find' the device,
3866       # it will automatically activate the network, if the physical_id
3867       # is correct
3868       cfg.SetDiskID(dev, pri_node)
3869       if not rpc.call_blockdev_find(pri_node, dev):
3870         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3871                 "please do a gnt-instance info to see the status of disks")
3872
3873     # this can fail as the old devices are degraded and _WaitForSync
3874     # does a combined result over all disks, so we don't check its
3875     # return value
3876     self.proc.LogStep(5, steps_total, "sync devices")
3877     _WaitForSync(cfg, instance, self.proc, unlock=True)
3878
3879     # so check manually all the devices
3880     for name, (dev, old_lvs) in iv_names.iteritems():
3881       cfg.SetDiskID(dev, pri_node)
3882       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3883       if is_degr:
3884         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3885
3886     self.proc.LogStep(6, steps_total, "removing old storage")
3887     for name, (dev, old_lvs) in iv_names.iteritems():
3888       info("remove logical volumes for %s" % name)
3889       for lv in old_lvs:
3890         cfg.SetDiskID(lv, old_node)
3891         if not rpc.call_blockdev_remove(old_node, lv):
3892           warning("Can't remove LV on old secondary",
3893                   "Cleanup stale volumes by hand")
3894
3895   def Exec(self, feedback_fn):
3896     """Execute disk replacement.
3897
3898     This dispatches the disk replacement to the appropriate handler.
3899
3900     """
3901     instance = self.instance
3902     if instance.disk_template == constants.DT_REMOTE_RAID1:
3903       fn = self._ExecRR1
3904     elif instance.disk_template == constants.DT_DRBD8:
3905       if self.op.remote_node is None:
3906         fn = self._ExecD8DiskOnly
3907       else:
3908         fn = self._ExecD8Secondary
3909     else:
3910       raise errors.ProgrammerError("Unhandled disk replacement case")
3911     return fn(feedback_fn)
3912
3913
3914 class LUQueryInstanceData(NoHooksLU):
3915   """Query runtime instance data.
3916
3917   """
3918   _OP_REQP = ["instances"]
3919
3920   def CheckPrereq(self):
3921     """Check prerequisites.
3922
3923     This only checks the optional instance list against the existing names.
3924
3925     """
3926     if not isinstance(self.op.instances, list):
3927       raise errors.OpPrereqError("Invalid argument type 'instances'")
3928     if self.op.instances:
3929       self.wanted_instances = []
3930       names = self.op.instances
3931       for name in names:
3932         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3933         if instance is None:
3934           raise errors.OpPrereqError("No such instance name '%s'" % name)
3935       self.wanted_instances.append(instance)
3936     else:
3937       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3938                                in self.cfg.GetInstanceList()]
3939     return
3940
3941
3942   def _ComputeDiskStatus(self, instance, snode, dev):
3943     """Compute block device status.
3944
3945     """
3946     self.cfg.SetDiskID(dev, instance.primary_node)
3947     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3948     if dev.dev_type in constants.LDS_DRBD:
3949       # we change the snode then (otherwise we use the one passed in)
3950       if dev.logical_id[0] == instance.primary_node:
3951         snode = dev.logical_id[1]
3952       else:
3953         snode = dev.logical_id[0]
3954
3955     if snode:
3956       self.cfg.SetDiskID(dev, snode)
3957       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3958     else:
3959       dev_sstatus = None
3960
3961     if dev.children:
3962       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3963                       for child in dev.children]
3964     else:
3965       dev_children = []
3966
3967     data = {
3968       "iv_name": dev.iv_name,
3969       "dev_type": dev.dev_type,
3970       "logical_id": dev.logical_id,
3971       "physical_id": dev.physical_id,
3972       "pstatus": dev_pstatus,
3973       "sstatus": dev_sstatus,
3974       "children": dev_children,
3975       }
3976
3977     return data
3978
3979   def Exec(self, feedback_fn):
3980     """Gather and return data"""
3981     result = {}
3982     for instance in self.wanted_instances:
3983       remote_info = rpc.call_instance_info(instance.primary_node,
3984                                                 instance.name)
3985       if remote_info and "state" in remote_info:
3986         remote_state = "up"
3987       else:
3988         remote_state = "down"
3989       if instance.status == "down":
3990         config_state = "down"
3991       else:
3992         config_state = "up"
3993
3994       disks = [self._ComputeDiskStatus(instance, None, device)
3995                for device in instance.disks]
3996
3997       idict = {
3998         "name": instance.name,
3999         "config_state": config_state,
4000         "run_state": remote_state,
4001         "pnode": instance.primary_node,
4002         "snodes": instance.secondary_nodes,
4003         "os": instance.os,
4004         "memory": instance.memory,
4005         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4006         "disks": disks,
4007         "network_port": instance.network_port,
4008         "vcpus": instance.vcpus,
4009         }
4010
4011       result[instance.name] = idict
4012
4013     return result
4014
4015
4016 class LUSetInstanceParms(LogicalUnit):
4017   """Modifies an instances's parameters.
4018
4019   """
4020   HPATH = "instance-modify"
4021   HTYPE = constants.HTYPE_INSTANCE
4022   _OP_REQP = ["instance_name"]
4023
4024   def BuildHooksEnv(self):
4025     """Build hooks env.
4026
4027     This runs on the master, primary and secondaries.
4028
4029     """
4030     args = dict()
4031     if self.mem:
4032       args['memory'] = self.mem
4033     if self.vcpus:
4034       args['vcpus'] = self.vcpus
4035     if self.do_ip or self.do_bridge:
4036       if self.do_ip:
4037         ip = self.ip
4038       else:
4039         ip = self.instance.nics[0].ip
4040       if self.bridge:
4041         bridge = self.bridge
4042       else:
4043         bridge = self.instance.nics[0].bridge
4044       args['nics'] = [(ip, bridge)]
4045     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4046     nl = [self.sstore.GetMasterNode(),
4047           self.instance.primary_node] + list(self.instance.secondary_nodes)
4048     return env, nl, nl
4049
4050   def CheckPrereq(self):
4051     """Check prerequisites.
4052
4053     This only checks the instance list against the existing names.
4054
4055     """
4056     self.mem = getattr(self.op, "mem", None)
4057     self.vcpus = getattr(self.op, "vcpus", None)
4058     self.ip = getattr(self.op, "ip", None)
4059     self.bridge = getattr(self.op, "bridge", None)
4060     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
4061       raise errors.OpPrereqError("No changes submitted")
4062     if self.mem is not None:
4063       try:
4064         self.mem = int(self.mem)
4065       except ValueError, err:
4066         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4067     if self.vcpus is not None:
4068       try:
4069         self.vcpus = int(self.vcpus)
4070       except ValueError, err:
4071         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4072     if self.ip is not None:
4073       self.do_ip = True
4074       if self.ip.lower() == "none":
4075         self.ip = None
4076       else:
4077         if not utils.IsValidIP(self.ip):
4078           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4079     else:
4080       self.do_ip = False
4081     self.do_bridge = (self.bridge is not None)
4082
4083     instance = self.cfg.GetInstanceInfo(
4084       self.cfg.ExpandInstanceName(self.op.instance_name))
4085     if instance is None:
4086       raise errors.OpPrereqError("No such instance name '%s'" %
4087                                  self.op.instance_name)
4088     self.op.instance_name = instance.name
4089     self.instance = instance
4090     return
4091
4092   def Exec(self, feedback_fn):
4093     """Modifies an instance.
4094
4095     All parameters take effect only at the next restart of the instance.
4096     """
4097     result = []
4098     instance = self.instance
4099     if self.mem:
4100       instance.memory = self.mem
4101       result.append(("mem", self.mem))
4102     if self.vcpus:
4103       instance.vcpus = self.vcpus
4104       result.append(("vcpus",  self.vcpus))
4105     if self.do_ip:
4106       instance.nics[0].ip = self.ip
4107       result.append(("ip", self.ip))
4108     if self.bridge:
4109       instance.nics[0].bridge = self.bridge
4110       result.append(("bridge", self.bridge))
4111
4112     self.cfg.AddInstance(instance)
4113
4114     return result
4115
4116
4117 class LUQueryExports(NoHooksLU):
4118   """Query the exports list
4119
4120   """
4121   _OP_REQP = []
4122
4123   def CheckPrereq(self):
4124     """Check that the nodelist contains only existing nodes.
4125
4126     """
4127     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4128
4129   def Exec(self, feedback_fn):
4130     """Compute the list of all the exported system images.
4131
4132     Returns:
4133       a dictionary with the structure node->(export-list)
4134       where export-list is a list of the instances exported on
4135       that node.
4136
4137     """
4138     return rpc.call_export_list(self.nodes)
4139
4140
4141 class LUExportInstance(LogicalUnit):
4142   """Export an instance to an image in the cluster.
4143
4144   """
4145   HPATH = "instance-export"
4146   HTYPE = constants.HTYPE_INSTANCE
4147   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4148
4149   def BuildHooksEnv(self):
4150     """Build hooks env.
4151
4152     This will run on the master, primary node and target node.
4153
4154     """
4155     env = {
4156       "EXPORT_NODE": self.op.target_node,
4157       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4158       }
4159     env.update(_BuildInstanceHookEnvByObject(self.instance))
4160     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4161           self.op.target_node]
4162     return env, nl, nl
4163
4164   def CheckPrereq(self):
4165     """Check prerequisites.
4166
4167     This checks that the instance name is a valid one.
4168
4169     """
4170     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4171     self.instance = self.cfg.GetInstanceInfo(instance_name)
4172     if self.instance is None:
4173       raise errors.OpPrereqError("Instance '%s' not found" %
4174                                  self.op.instance_name)
4175
4176     # node verification
4177     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4178     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4179
4180     if self.dst_node is None:
4181       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4182                                  self.op.target_node)
4183     self.op.target_node = self.dst_node.name
4184
4185   def Exec(self, feedback_fn):
4186     """Export an instance to an image in the cluster.
4187
4188     """
4189     instance = self.instance
4190     dst_node = self.dst_node
4191     src_node = instance.primary_node
4192     # shutdown the instance, unless requested not to do so
4193     if self.op.shutdown:
4194       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4195       self.proc.ChainOpCode(op)
4196
4197     vgname = self.cfg.GetVGName()
4198
4199     snap_disks = []
4200
4201     try:
4202       for disk in instance.disks:
4203         if disk.iv_name == "sda":
4204           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4205           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4206
4207           if not new_dev_name:
4208             logger.Error("could not snapshot block device %s on node %s" %
4209                          (disk.logical_id[1], src_node))
4210           else:
4211             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4212                                       logical_id=(vgname, new_dev_name),
4213                                       physical_id=(vgname, new_dev_name),
4214                                       iv_name=disk.iv_name)
4215             snap_disks.append(new_dev)
4216
4217     finally:
4218       if self.op.shutdown:
4219         op = opcodes.OpStartupInstance(instance_name=instance.name,
4220                                        force=False)
4221         self.proc.ChainOpCode(op)
4222
4223     # TODO: check for size
4224
4225     for dev in snap_disks:
4226       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4227                                            instance):
4228         logger.Error("could not export block device %s from node"
4229                      " %s to node %s" %
4230                      (dev.logical_id[1], src_node, dst_node.name))
4231       if not rpc.call_blockdev_remove(src_node, dev):
4232         logger.Error("could not remove snapshot block device %s from"
4233                      " node %s" % (dev.logical_id[1], src_node))
4234
4235     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4236       logger.Error("could not finalize export for instance %s on node %s" %
4237                    (instance.name, dst_node.name))
4238
4239     nodelist = self.cfg.GetNodeList()
4240     nodelist.remove(dst_node.name)
4241
4242     # on one-node clusters nodelist will be empty after the removal
4243     # if we proceed the backup would be removed because OpQueryExports
4244     # substitutes an empty list with the full cluster node list.
4245     if nodelist:
4246       op = opcodes.OpQueryExports(nodes=nodelist)
4247       exportlist = self.proc.ChainOpCode(op)
4248       for node in exportlist:
4249         if instance.name in exportlist[node]:
4250           if not rpc.call_export_remove(node, instance.name):
4251             logger.Error("could not remove older export for instance %s"
4252                          " on node %s" % (instance.name, node))
4253
4254
4255 class TagsLU(NoHooksLU):
4256   """Generic tags LU.
4257
4258   This is an abstract class which is the parent of all the other tags LUs.
4259
4260   """
4261   def CheckPrereq(self):
4262     """Check prerequisites.
4263
4264     """
4265     if self.op.kind == constants.TAG_CLUSTER:
4266       self.target = self.cfg.GetClusterInfo()
4267     elif self.op.kind == constants.TAG_NODE:
4268       name = self.cfg.ExpandNodeName(self.op.name)
4269       if name is None:
4270         raise errors.OpPrereqError("Invalid node name (%s)" %
4271                                    (self.op.name,))
4272       self.op.name = name
4273       self.target = self.cfg.GetNodeInfo(name)
4274     elif self.op.kind == constants.TAG_INSTANCE:
4275       name = self.cfg.ExpandInstanceName(self.op.name)
4276       if name is None:
4277         raise errors.OpPrereqError("Invalid instance name (%s)" %
4278                                    (self.op.name,))
4279       self.op.name = name
4280       self.target = self.cfg.GetInstanceInfo(name)
4281     else:
4282       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4283                                  str(self.op.kind))
4284
4285
4286 class LUGetTags(TagsLU):
4287   """Returns the tags of a given object.
4288
4289   """
4290   _OP_REQP = ["kind", "name"]
4291
4292   def Exec(self, feedback_fn):
4293     """Returns the tag list.
4294
4295     """
4296     return self.target.GetTags()
4297
4298
4299 class LUSearchTags(NoHooksLU):
4300   """Searches the tags for a given pattern.
4301
4302   """
4303   _OP_REQP = ["pattern"]
4304
4305   def CheckPrereq(self):
4306     """Check prerequisites.
4307
4308     This checks the pattern passed for validity by compiling it.
4309
4310     """
4311     try:
4312       self.re = re.compile(self.op.pattern)
4313     except re.error, err:
4314       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4315                                  (self.op.pattern, err))
4316
4317   def Exec(self, feedback_fn):
4318     """Returns the tag list.
4319
4320     """
4321     cfg = self.cfg
4322     tgts = [("/cluster", cfg.GetClusterInfo())]
4323     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4324     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4325     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4326     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4327     results = []
4328     for path, target in tgts:
4329       for tag in target.GetTags():
4330         if self.re.search(tag):
4331           results.append((path, tag))
4332     return results
4333
4334
4335 class LUAddTags(TagsLU):
4336   """Sets a tag on a given object.
4337
4338   """
4339   _OP_REQP = ["kind", "name", "tags"]
4340
4341   def CheckPrereq(self):
4342     """Check prerequisites.
4343
4344     This checks the type and length of the tag name and value.
4345
4346     """
4347     TagsLU.CheckPrereq(self)
4348     for tag in self.op.tags:
4349       objects.TaggableObject.ValidateTag(tag)
4350
4351   def Exec(self, feedback_fn):
4352     """Sets the tag.
4353
4354     """
4355     try:
4356       for tag in self.op.tags:
4357         self.target.AddTag(tag)
4358     except errors.TagError, err:
4359       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4360     try:
4361       self.cfg.Update(self.target)
4362     except errors.ConfigurationError:
4363       raise errors.OpRetryError("There has been a modification to the"
4364                                 " config file and the operation has been"
4365                                 " aborted. Please retry.")
4366
4367
4368 class LUDelTags(TagsLU):
4369   """Delete a list of tags from a given object.
4370
4371   """
4372   _OP_REQP = ["kind", "name", "tags"]
4373
4374   def CheckPrereq(self):
4375     """Check prerequisites.
4376
4377     This checks that we have the given tag.
4378
4379     """
4380     TagsLU.CheckPrereq(self)
4381     for tag in self.op.tags:
4382       objects.TaggableObject.ValidateTag(tag)
4383     del_tags = frozenset(self.op.tags)
4384     cur_tags = self.target.GetTags()
4385     if not del_tags <= cur_tags:
4386       diff_tags = del_tags - cur_tags
4387       diff_names = ["'%s'" % tag for tag in diff_tags]
4388       diff_names.sort()
4389       raise errors.OpPrereqError("Tag(s) %s not found" %
4390                                  (",".join(diff_names)))
4391
4392   def Exec(self, feedback_fn):
4393     """Remove the tag from the object.
4394
4395     """
4396     for tag in self.op.tags:
4397       self.target.RemoveTag(tag)
4398     try:
4399       self.cfg.Update(self.target)
4400     except errors.ConfigurationError:
4401       raise errors.OpRetryError("There has been a modification to the"
4402                                 " config file and the operation has been"
4403                                 " aborted. Please retry.")