Fix logging of some checks in LUClusterVerify
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275   else:
276     nic_count = 0
277
278   env["INSTANCE_NIC_COUNT"] = nic_count
279
280   return env
281
282
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284   """Builds instance related env variables for hooks from an object.
285
286   Args:
287     instance: objects.Instance object of instance
288     override: dict of values to override
289   """
290   args = {
291     'name': instance.name,
292     'primary_node': instance.primary_node,
293     'secondary_nodes': instance.secondary_nodes,
294     'os_type': instance.os,
295     'status': instance.os,
296     'memory': instance.memory,
297     'vcpus': instance.vcpus,
298     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299   }
300   if override:
301     args.update(override)
302   return _BuildInstanceHookEnv(**args)
303
304
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306   """Ensure a node has a correct known_hosts entry.
307
308   Args:
309     fullnode - Fully qualified domain name of host. (str)
310     ip       - IPv4 address of host (str)
311     pubkey   - the public key of the cluster
312
313   """
314   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316   else:
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318
319   inthere = False
320
321   save_lines = []
322   add_lines = []
323   removed = False
324
325   for rawline in f:
326     logger.Debug('read %s' % (repr(rawline),))
327
328     parts = rawline.rstrip('\r\n').split()
329
330     # Ignore unwanted lines
331     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332       fields = parts[0].split(',')
333       key = parts[2]
334
335       haveall = True
336       havesome = False
337       for spec in [ ip, fullnode ]:
338         if spec not in fields:
339           haveall = False
340         if spec in fields:
341           havesome = True
342
343       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344       if haveall and key == pubkey:
345         inthere = True
346         save_lines.append(rawline)
347         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348         continue
349
350       if havesome and (not haveall or key != pubkey):
351         removed = True
352         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353         continue
354
355     save_lines.append(rawline)
356
357   if not inthere:
358     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360
361   if removed:
362     save_lines = save_lines + add_lines
363
364     # Write a new file and replace old.
365     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366                                    constants.DATA_DIR)
367     newfile = os.fdopen(fd, 'w')
368     try:
369       newfile.write(''.join(save_lines))
370     finally:
371       newfile.close()
372     logger.Debug("Wrote new known_hosts.")
373     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374
375   elif add_lines:
376     # Simply appending a new line will do the trick.
377     f.seek(0, 2)
378     for add in add_lines:
379       f.write(add)
380
381   f.close()
382
383
384 def _HasValidVG(vglist, vgname):
385   """Checks if the volume group list is valid.
386
387   A non-None return value means there's an error, and the return value
388   is the error message.
389
390   """
391   vgsize = vglist.get(vgname, None)
392   if vgsize is None:
393     return "volume group '%s' missing" % vgname
394   elif vgsize < 20480:
395     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396             (vgname, vgsize))
397   return None
398
399
400 def _InitSSHSetup(node):
401   """Setup the SSH configuration for the cluster.
402
403
404   This generates a dsa keypair for root, adds the pub key to the
405   permitted hosts and adds the hostkey to its own known hosts.
406
407   Args:
408     node: the name of this host as a fqdn
409
410   """
411   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412
413   for name in priv_key, pub_key:
414     if os.path.exists(name):
415       utils.CreateBackup(name)
416     utils.RemoveFile(name)
417
418   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419                          "-f", priv_key,
420                          "-q", "-N", ""])
421   if result.failed:
422     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423                              result.output)
424
425   f = open(pub_key, 'r')
426   try:
427     utils.AddAuthorizedKey(auth_keys, f.read(8192))
428   finally:
429     f.close()
430
431
432 def _InitGanetiServerSetup(ss):
433   """Setup the necessary configuration for the initial node daemon.
434
435   This creates the nodepass file containing the shared password for
436   the cluster and also generates the SSL certificate.
437
438   """
439   # Create pseudo random password
440   randpass = sha.new(os.urandom(64)).hexdigest()
441   # and write it into sstore
442   ss.SetKey(ss.SS_NODED_PASS, randpass)
443
444   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445                          "-days", str(365*5), "-nodes", "-x509",
446                          "-keyout", constants.SSL_CERT_FILE,
447                          "-out", constants.SSL_CERT_FILE, "-batch"])
448   if result.failed:
449     raise errors.OpExecError("could not generate server ssl cert, command"
450                              " %s had exitcode %s and error message %s" %
451                              (result.cmd, result.exit_code, result.output))
452
453   os.chmod(constants.SSL_CERT_FILE, 0400)
454
455   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456
457   if result.failed:
458     raise errors.OpExecError("Could not start the node daemon, command %s"
459                              " had exitcode %s and error %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462
463 def _CheckInstanceBridgesExist(instance):
464   """Check that the brigdes needed by an instance exist.
465
466   """
467   # check bridges existance
468   brlist = [nic.bridge for nic in instance.nics]
469   if not rpc.call_bridges_exist(instance.primary_node, brlist):
470     raise errors.OpPrereqError("one or more target bridges %s does not"
471                                " exist on destination node '%s'" %
472                                (brlist, instance.primary_node))
473
474
475 class LUInitCluster(LogicalUnit):
476   """Initialise the cluster.
477
478   """
479   HPATH = "cluster-init"
480   HTYPE = constants.HTYPE_CLUSTER
481   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482               "def_bridge", "master_netdev"]
483   REQ_CLUSTER = False
484
485   def BuildHooksEnv(self):
486     """Build hooks env.
487
488     Notes: Since we don't require a cluster, we must manually add
489     ourselves in the post-run node list.
490
491     """
492     env = {"OP_TARGET": self.op.cluster_name}
493     return env, [], [self.hostname.name]
494
495   def CheckPrereq(self):
496     """Verify that the passed name is a valid one.
497
498     """
499     if config.ConfigWriter.IsCluster():
500       raise errors.OpPrereqError("Cluster is already initialised")
501
502     self.hostname = hostname = utils.HostInfo()
503
504     if hostname.ip.startswith("127."):
505       raise errors.OpPrereqError("This host's IP resolves to the private"
506                                  " range (%s). Please fix DNS or /etc/hosts." %
507                                  (hostname.ip,))
508
509     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510
511     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512                          constants.DEFAULT_NODED_PORT):
513       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514                                  " to %s,\nbut this ip address does not"
515                                  " belong to this host."
516                                  " Aborting." % hostname.ip)
517
518     secondary_ip = getattr(self.op, "secondary_ip", None)
519     if secondary_ip and not utils.IsValidIP(secondary_ip):
520       raise errors.OpPrereqError("Invalid secondary ip given")
521     if (secondary_ip and
522         secondary_ip != hostname.ip and
523         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524                            constants.DEFAULT_NODED_PORT))):
525       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
526                                  "but it does not belong to this host." %
527                                  secondary_ip)
528     self.secondary_ip = secondary_ip
529
530     # checks presence of the volume group given
531     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532
533     if vgstatus:
534       raise errors.OpPrereqError("Error: %s" % vgstatus)
535
536     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537                     self.op.mac_prefix):
538       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539                                  self.op.mac_prefix)
540
541     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543                                  self.op.hypervisor_type)
544
545     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546     if result.failed:
547       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548                                  (self.op.master_netdev,
549                                   result.output.strip()))
550
551     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553       raise errors.OpPrereqError("Init.d script '%s' missing or not "
554                                  "executable." % constants.NODE_INITD_SCRIPT)
555
556   def Exec(self, feedback_fn):
557     """Initialize the cluster.
558
559     """
560     clustername = self.clustername
561     hostname = self.hostname
562
563     # set up the simple store
564     self.sstore = ss = ssconf.SimpleStore()
565     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570
571     # set up the inter-node password and certificate
572     _InitGanetiServerSetup(ss)
573
574     # start the master ip
575     rpc.call_node_start_master(hostname.name)
576
577     # set up ssh config and /etc/hosts
578     f = open(constants.SSH_HOST_RSA_PUB, 'r')
579     try:
580       sshline = f.read()
581     finally:
582       f.close()
583     sshkey = sshline.split(" ")[1]
584
585     _AddHostToEtcHosts(hostname.name)
586
587     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588
589     _InitSSHSetup(hostname.name)
590
591     # init of cluster config file
592     self.cfg = cfgw = config.ConfigWriter()
593     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594                     sshkey, self.op.mac_prefix,
595                     self.op.vg_name, self.op.def_bridge)
596
597
598 class LUDestroyCluster(NoHooksLU):
599   """Logical unit for destroying the cluster.
600
601   """
602   _OP_REQP = []
603
604   def CheckPrereq(self):
605     """Check prerequisites.
606
607     This checks whether the cluster is empty.
608
609     Any errors are signalled by raising errors.OpPrereqError.
610
611     """
612     master = self.sstore.GetMasterNode()
613
614     nodelist = self.cfg.GetNodeList()
615     if len(nodelist) != 1 or nodelist[0] != master:
616       raise errors.OpPrereqError("There are still %d node(s) in"
617                                  " this cluster." % (len(nodelist) - 1))
618     instancelist = self.cfg.GetInstanceList()
619     if instancelist:
620       raise errors.OpPrereqError("There are still %d instance(s) in"
621                                  " this cluster." % len(instancelist))
622
623   def Exec(self, feedback_fn):
624     """Destroys the cluster.
625
626     """
627     master = self.sstore.GetMasterNode()
628     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629     utils.CreateBackup(priv_key)
630     utils.CreateBackup(pub_key)
631     rpc.call_node_leave_cluster(master)
632
633
634 class LUVerifyCluster(NoHooksLU):
635   """Verifies the cluster status.
636
637   """
638   _OP_REQP = []
639
640   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641                   remote_version, feedback_fn):
642     """Run multiple tests against a node.
643
644     Test list:
645       - compares ganeti version
646       - checks vg existance and size > 20G
647       - checks config file checksum
648       - checks ssh to other nodes
649
650     Args:
651       node: name of the node to check
652       file_list: required list of files
653       local_cksum: dictionary of local files and their checksums
654
655     """
656     # compares ganeti version
657     local_version = constants.PROTOCOL_VERSION
658     if not remote_version:
659       feedback_fn(" - ERROR: connection to %s failed" % (node))
660       return True
661
662     if local_version != remote_version:
663       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664                       (local_version, node, remote_version))
665       return True
666
667     # checks vg existance and size > 20G
668
669     bad = False
670     if not vglist:
671       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672                       (node,))
673       bad = True
674     else:
675       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676       if vgstatus:
677         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678         bad = True
679
680     # checks config file checksum
681     # checks ssh to any
682
683     if 'filelist' not in node_result:
684       bad = True
685       feedback_fn("  - ERROR: node hasn't returned file checksum data")
686     else:
687       remote_cksum = node_result['filelist']
688       for file_name in file_list:
689         if file_name not in remote_cksum:
690           bad = True
691           feedback_fn("  - ERROR: file '%s' missing" % file_name)
692         elif remote_cksum[file_name] != local_cksum[file_name]:
693           bad = True
694           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695
696     if 'nodelist' not in node_result:
697       bad = True
698       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699     else:
700       if node_result['nodelist']:
701         bad = True
702         for node in node_result['nodelist']:
703           feedback_fn("  - ERROR: communication with node '%s': %s" %
704                           (node, node_result['nodelist'][node]))
705     hyp_result = node_result.get('hypervisor', None)
706     if hyp_result is not None:
707       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708     return bad
709
710   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711     """Verify an instance.
712
713     This function checks to see if the required block devices are
714     available on the instance's node.
715
716     """
717     bad = False
718
719     instancelist = self.cfg.GetInstanceList()
720     if not instance in instancelist:
721       feedback_fn("  - ERROR: instance %s not in instance list %s" %
722                       (instance, instancelist))
723       bad = True
724
725     instanceconfig = self.cfg.GetInstanceInfo(instance)
726     node_current = instanceconfig.primary_node
727
728     node_vol_should = {}
729     instanceconfig.MapLVsByNode(node_vol_should)
730
731     for node in node_vol_should:
732       for volume in node_vol_should[node]:
733         if node not in node_vol_is or volume not in node_vol_is[node]:
734           feedback_fn("  - ERROR: volume %s missing on node %s" %
735                           (volume, node))
736           bad = True
737
738     if not instanceconfig.status == 'down':
739       if not instance in node_instance[node_current]:
740         feedback_fn("  - ERROR: instance %s not running on node %s" %
741                         (instance, node_current))
742         bad = True
743
744     for node in node_instance:
745       if (not node == node_current):
746         if instance in node_instance[node]:
747           feedback_fn("  - ERROR: instance %s should not run on node %s" %
748                           (instance, node))
749           bad = True
750
751     return bad
752
753   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754     """Verify if there are any unknown volumes in the cluster.
755
756     The .os, .swap and backup volumes are ignored. All other volumes are
757     reported as unknown.
758
759     """
760     bad = False
761
762     for node in node_vol_is:
763       for volume in node_vol_is[node]:
764         if node not in node_vol_should or volume not in node_vol_should[node]:
765           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766                       (volume, node))
767           bad = True
768     return bad
769
770   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771     """Verify the list of running instances.
772
773     This checks what instances are running but unknown to the cluster.
774
775     """
776     bad = False
777     for node in node_instance:
778       for runninginstance in node_instance[node]:
779         if runninginstance not in instancelist:
780           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781                           (runninginstance, node))
782           bad = True
783     return bad
784
785   def CheckPrereq(self):
786     """Check prerequisites.
787
788     This has no prerequisites.
789
790     """
791     pass
792
793   def Exec(self, feedback_fn):
794     """Verify integrity of cluster, performing various test on nodes.
795
796     """
797     bad = False
798     feedback_fn("* Verifying global settings")
799     for msg in self.cfg.VerifyConfig():
800       feedback_fn("  - ERROR: %s" % msg)
801
802     vg_name = self.cfg.GetVGName()
803     nodelist = utils.NiceSort(self.cfg.GetNodeList())
804     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
805     node_volume = {}
806     node_instance = {}
807
808     # FIXME: verify OS list
809     # do local checksums
810     file_names = list(self.sstore.GetFileList())
811     file_names.append(constants.SSL_CERT_FILE)
812     file_names.append(constants.CLUSTER_CONF_FILE)
813     local_checksums = utils.FingerprintFiles(file_names)
814
815     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817     all_instanceinfo = rpc.call_instance_list(nodelist)
818     all_vglist = rpc.call_vg_list(nodelist)
819     node_verify_param = {
820       'filelist': file_names,
821       'nodelist': nodelist,
822       'hypervisor': None,
823       }
824     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825     all_rversion = rpc.call_version(nodelist)
826
827     for node in nodelist:
828       feedback_fn("* Verifying node %s" % node)
829       result = self._VerifyNode(node, file_names, local_checksums,
830                                 all_vglist[node], all_nvinfo[node],
831                                 all_rversion[node], feedback_fn)
832       bad = bad or result
833
834       # node_volume
835       volumeinfo = all_volumeinfo[node]
836
837       if type(volumeinfo) != dict:
838         feedback_fn("  - ERROR: connection to %s failed" % (node,))
839         bad = True
840         continue
841
842       node_volume[node] = volumeinfo
843
844       # node_instance
845       nodeinstance = all_instanceinfo[node]
846       if type(nodeinstance) != list:
847         feedback_fn("  - ERROR: connection to %s failed" % (node,))
848         bad = True
849         continue
850
851       node_instance[node] = nodeinstance
852
853     node_vol_should = {}
854
855     for instance in instancelist:
856       feedback_fn("* Verifying instance %s" % instance)
857       result =  self._VerifyInstance(instance, node_volume, node_instance,
858                                      feedback_fn)
859       bad = bad or result
860
861       inst_config = self.cfg.GetInstanceInfo(instance)
862
863       inst_config.MapLVsByNode(node_vol_should)
864
865     feedback_fn("* Verifying orphan volumes")
866     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867                                        feedback_fn)
868     bad = bad or result
869
870     feedback_fn("* Verifying remaining instances")
871     result = self._VerifyOrphanInstances(instancelist, node_instance,
872                                          feedback_fn)
873     bad = bad or result
874
875     return int(bad)
876
877
878 class LURenameCluster(LogicalUnit):
879   """Rename the cluster.
880
881   """
882   HPATH = "cluster-rename"
883   HTYPE = constants.HTYPE_CLUSTER
884   _OP_REQP = ["name"]
885
886   def BuildHooksEnv(self):
887     """Build hooks env.
888
889     """
890     env = {
891       "OP_TARGET": self.op.sstore.GetClusterName(),
892       "NEW_NAME": self.op.name,
893       }
894     mn = self.sstore.GetMasterNode()
895     return env, [mn], [mn]
896
897   def CheckPrereq(self):
898     """Verify that the passed name is a valid one.
899
900     """
901     hostname = utils.HostInfo(self.op.name)
902
903     new_name = hostname.name
904     self.ip = new_ip = hostname.ip
905     old_name = self.sstore.GetClusterName()
906     old_ip = self.sstore.GetMasterIP()
907     if new_name == old_name and new_ip == old_ip:
908       raise errors.OpPrereqError("Neither the name nor the IP address of the"
909                                  " cluster has changed")
910     if new_ip != old_ip:
911       result = utils.RunCmd(["fping", "-q", new_ip])
912       if not result.failed:
913         raise errors.OpPrereqError("The given cluster IP address (%s) is"
914                                    " reachable on the network. Aborting." %
915                                    new_ip)
916
917     self.op.name = new_name
918
919   def Exec(self, feedback_fn):
920     """Rename the cluster.
921
922     """
923     clustername = self.op.name
924     ip = self.ip
925     ss = self.sstore
926
927     # shutdown the master IP
928     master = ss.GetMasterNode()
929     if not rpc.call_node_stop_master(master):
930       raise errors.OpExecError("Could not disable the master role")
931
932     try:
933       # modify the sstore
934       ss.SetKey(ss.SS_MASTER_IP, ip)
935       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
936
937       # Distribute updated ss config to all nodes
938       myself = self.cfg.GetNodeInfo(master)
939       dist_nodes = self.cfg.GetNodeList()
940       if myself.name in dist_nodes:
941         dist_nodes.remove(myself.name)
942
943       logger.Debug("Copying updated ssconf data to all nodes")
944       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
945         fname = ss.KeyToFilename(keyname)
946         result = rpc.call_upload_file(dist_nodes, fname)
947         for to_node in dist_nodes:
948           if not result[to_node]:
949             logger.Error("copy of file %s to node %s failed" %
950                          (fname, to_node))
951     finally:
952       if not rpc.call_node_start_master(master):
953         logger.Error("Could not re-enable the master role on the master,\n"
954                      "please restart manually.")
955
956
957 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
958   """Sleep and poll for an instance's disk to sync.
959
960   """
961   if not instance.disks:
962     return True
963
964   if not oneshot:
965     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
966
967   node = instance.primary_node
968
969   for dev in instance.disks:
970     cfgw.SetDiskID(dev, node)
971
972   retries = 0
973   while True:
974     max_time = 0
975     done = True
976     cumul_degraded = False
977     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
978     if not rstats:
979       proc.LogWarning("Can't get any data from node %s" % node)
980       retries += 1
981       if retries >= 10:
982         raise errors.RemoteError("Can't contact node %s for mirror data,"
983                                  " aborting." % node)
984       time.sleep(6)
985       continue
986     retries = 0
987     for i in range(len(rstats)):
988       mstat = rstats[i]
989       if mstat is None:
990         proc.LogWarning("Can't compute data for node %s/%s" %
991                         (node, instance.disks[i].iv_name))
992         continue
993       # we ignore the ldisk parameter
994       perc_done, est_time, is_degraded, _ = mstat
995       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
996       if perc_done is not None:
997         done = False
998         if est_time is not None:
999           rem_time = "%d estimated seconds remaining" % est_time
1000           max_time = est_time
1001         else:
1002           rem_time = "no time estimate"
1003         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1004                      (instance.disks[i].iv_name, perc_done, rem_time))
1005     if done or oneshot:
1006       break
1007
1008     if unlock:
1009       utils.Unlock('cmd')
1010     try:
1011       time.sleep(min(60, max_time))
1012     finally:
1013       if unlock:
1014         utils.Lock('cmd')
1015
1016   if done:
1017     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1018   return not cumul_degraded
1019
1020
1021 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1022   """Check that mirrors are not degraded.
1023
1024   The ldisk parameter, if True, will change the test from the
1025   is_degraded attribute (which represents overall non-ok status for
1026   the device(s)) to the ldisk (representing the local storage status).
1027
1028   """
1029   cfgw.SetDiskID(dev, node)
1030   if ldisk:
1031     idx = 6
1032   else:
1033     idx = 5
1034
1035   result = True
1036   if on_primary or dev.AssembleOnSecondary():
1037     rstats = rpc.call_blockdev_find(node, dev)
1038     if not rstats:
1039       logger.ToStderr("Can't get any data from node %s" % node)
1040       result = False
1041     else:
1042       result = result and (not rstats[idx])
1043   if dev.children:
1044     for child in dev.children:
1045       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1046
1047   return result
1048
1049
1050 class LUDiagnoseOS(NoHooksLU):
1051   """Logical unit for OS diagnose/query.
1052
1053   """
1054   _OP_REQP = []
1055
1056   def CheckPrereq(self):
1057     """Check prerequisites.
1058
1059     This always succeeds, since this is a pure query LU.
1060
1061     """
1062     return
1063
1064   def Exec(self, feedback_fn):
1065     """Compute the list of OSes.
1066
1067     """
1068     node_list = self.cfg.GetNodeList()
1069     node_data = rpc.call_os_diagnose(node_list)
1070     if node_data == False:
1071       raise errors.OpExecError("Can't gather the list of OSes")
1072     return node_data
1073
1074
1075 class LURemoveNode(LogicalUnit):
1076   """Logical unit for removing a node.
1077
1078   """
1079   HPATH = "node-remove"
1080   HTYPE = constants.HTYPE_NODE
1081   _OP_REQP = ["node_name"]
1082
1083   def BuildHooksEnv(self):
1084     """Build hooks env.
1085
1086     This doesn't run on the target node in the pre phase as a failed
1087     node would not allows itself to run.
1088
1089     """
1090     env = {
1091       "OP_TARGET": self.op.node_name,
1092       "NODE_NAME": self.op.node_name,
1093       }
1094     all_nodes = self.cfg.GetNodeList()
1095     all_nodes.remove(self.op.node_name)
1096     return env, all_nodes, all_nodes
1097
1098   def CheckPrereq(self):
1099     """Check prerequisites.
1100
1101     This checks:
1102      - the node exists in the configuration
1103      - it does not have primary or secondary instances
1104      - it's not the master
1105
1106     Any errors are signalled by raising errors.OpPrereqError.
1107
1108     """
1109     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1110     if node is None:
1111       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1112
1113     instance_list = self.cfg.GetInstanceList()
1114
1115     masternode = self.sstore.GetMasterNode()
1116     if node.name == masternode:
1117       raise errors.OpPrereqError("Node is the master node,"
1118                                  " you need to failover first.")
1119
1120     for instance_name in instance_list:
1121       instance = self.cfg.GetInstanceInfo(instance_name)
1122       if node.name == instance.primary_node:
1123         raise errors.OpPrereqError("Instance %s still running on the node,"
1124                                    " please remove first." % instance_name)
1125       if node.name in instance.secondary_nodes:
1126         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1127                                    " please remove first." % instance_name)
1128     self.op.node_name = node.name
1129     self.node = node
1130
1131   def Exec(self, feedback_fn):
1132     """Removes the node from the cluster.
1133
1134     """
1135     node = self.node
1136     logger.Info("stopping the node daemon and removing configs from node %s" %
1137                 node.name)
1138
1139     rpc.call_node_leave_cluster(node.name)
1140
1141     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1142
1143     logger.Info("Removing node %s from config" % node.name)
1144
1145     self.cfg.RemoveNode(node.name)
1146
1147     _RemoveHostFromEtcHosts(node.name)
1148
1149
1150 class LUQueryNodes(NoHooksLU):
1151   """Logical unit for querying nodes.
1152
1153   """
1154   _OP_REQP = ["output_fields", "names"]
1155
1156   def CheckPrereq(self):
1157     """Check prerequisites.
1158
1159     This checks that the fields required are valid output fields.
1160
1161     """
1162     self.dynamic_fields = frozenset(["dtotal", "dfree",
1163                                      "mtotal", "mnode", "mfree",
1164                                      "bootid"])
1165
1166     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1167                                "pinst_list", "sinst_list",
1168                                "pip", "sip"],
1169                        dynamic=self.dynamic_fields,
1170                        selected=self.op.output_fields)
1171
1172     self.wanted = _GetWantedNodes(self, self.op.names)
1173
1174   def Exec(self, feedback_fn):
1175     """Computes the list of nodes and their attributes.
1176
1177     """
1178     nodenames = self.wanted
1179     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1180
1181     # begin data gathering
1182
1183     if self.dynamic_fields.intersection(self.op.output_fields):
1184       live_data = {}
1185       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1186       for name in nodenames:
1187         nodeinfo = node_data.get(name, None)
1188         if nodeinfo:
1189           live_data[name] = {
1190             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1191             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1192             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1193             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1194             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1195             "bootid": nodeinfo['bootid'],
1196             }
1197         else:
1198           live_data[name] = {}
1199     else:
1200       live_data = dict.fromkeys(nodenames, {})
1201
1202     node_to_primary = dict([(name, set()) for name in nodenames])
1203     node_to_secondary = dict([(name, set()) for name in nodenames])
1204
1205     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1206                              "sinst_cnt", "sinst_list"))
1207     if inst_fields & frozenset(self.op.output_fields):
1208       instancelist = self.cfg.GetInstanceList()
1209
1210       for instance_name in instancelist:
1211         inst = self.cfg.GetInstanceInfo(instance_name)
1212         if inst.primary_node in node_to_primary:
1213           node_to_primary[inst.primary_node].add(inst.name)
1214         for secnode in inst.secondary_nodes:
1215           if secnode in node_to_secondary:
1216             node_to_secondary[secnode].add(inst.name)
1217
1218     # end data gathering
1219
1220     output = []
1221     for node in nodelist:
1222       node_output = []
1223       for field in self.op.output_fields:
1224         if field == "name":
1225           val = node.name
1226         elif field == "pinst_list":
1227           val = list(node_to_primary[node.name])
1228         elif field == "sinst_list":
1229           val = list(node_to_secondary[node.name])
1230         elif field == "pinst_cnt":
1231           val = len(node_to_primary[node.name])
1232         elif field == "sinst_cnt":
1233           val = len(node_to_secondary[node.name])
1234         elif field == "pip":
1235           val = node.primary_ip
1236         elif field == "sip":
1237           val = node.secondary_ip
1238         elif field in self.dynamic_fields:
1239           val = live_data[node.name].get(field, None)
1240         else:
1241           raise errors.ParameterError(field)
1242         node_output.append(val)
1243       output.append(node_output)
1244
1245     return output
1246
1247
1248 class LUQueryNodeVolumes(NoHooksLU):
1249   """Logical unit for getting volumes on node(s).
1250
1251   """
1252   _OP_REQP = ["nodes", "output_fields"]
1253
1254   def CheckPrereq(self):
1255     """Check prerequisites.
1256
1257     This checks that the fields required are valid output fields.
1258
1259     """
1260     self.nodes = _GetWantedNodes(self, self.op.nodes)
1261
1262     _CheckOutputFields(static=["node"],
1263                        dynamic=["phys", "vg", "name", "size", "instance"],
1264                        selected=self.op.output_fields)
1265
1266
1267   def Exec(self, feedback_fn):
1268     """Computes the list of nodes and their attributes.
1269
1270     """
1271     nodenames = self.nodes
1272     volumes = rpc.call_node_volumes(nodenames)
1273
1274     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1275              in self.cfg.GetInstanceList()]
1276
1277     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1278
1279     output = []
1280     for node in nodenames:
1281       if node not in volumes or not volumes[node]:
1282         continue
1283
1284       node_vols = volumes[node][:]
1285       node_vols.sort(key=lambda vol: vol['dev'])
1286
1287       for vol in node_vols:
1288         node_output = []
1289         for field in self.op.output_fields:
1290           if field == "node":
1291             val = node
1292           elif field == "phys":
1293             val = vol['dev']
1294           elif field == "vg":
1295             val = vol['vg']
1296           elif field == "name":
1297             val = vol['name']
1298           elif field == "size":
1299             val = int(float(vol['size']))
1300           elif field == "instance":
1301             for inst in ilist:
1302               if node not in lv_by_node[inst]:
1303                 continue
1304               if vol['name'] in lv_by_node[inst][node]:
1305                 val = inst.name
1306                 break
1307             else:
1308               val = '-'
1309           else:
1310             raise errors.ParameterError(field)
1311           node_output.append(str(val))
1312
1313         output.append(node_output)
1314
1315     return output
1316
1317
1318 class LUAddNode(LogicalUnit):
1319   """Logical unit for adding node to the cluster.
1320
1321   """
1322   HPATH = "node-add"
1323   HTYPE = constants.HTYPE_NODE
1324   _OP_REQP = ["node_name"]
1325
1326   def BuildHooksEnv(self):
1327     """Build hooks env.
1328
1329     This will run on all nodes before, and on all nodes + the new node after.
1330
1331     """
1332     env = {
1333       "OP_TARGET": self.op.node_name,
1334       "NODE_NAME": self.op.node_name,
1335       "NODE_PIP": self.op.primary_ip,
1336       "NODE_SIP": self.op.secondary_ip,
1337       }
1338     nodes_0 = self.cfg.GetNodeList()
1339     nodes_1 = nodes_0 + [self.op.node_name, ]
1340     return env, nodes_0, nodes_1
1341
1342   def CheckPrereq(self):
1343     """Check prerequisites.
1344
1345     This checks:
1346      - the new node is not already in the config
1347      - it is resolvable
1348      - its parameters (single/dual homed) matches the cluster
1349
1350     Any errors are signalled by raising errors.OpPrereqError.
1351
1352     """
1353     node_name = self.op.node_name
1354     cfg = self.cfg
1355
1356     dns_data = utils.HostInfo(node_name)
1357
1358     node = dns_data.name
1359     primary_ip = self.op.primary_ip = dns_data.ip
1360     secondary_ip = getattr(self.op, "secondary_ip", None)
1361     if secondary_ip is None:
1362       secondary_ip = primary_ip
1363     if not utils.IsValidIP(secondary_ip):
1364       raise errors.OpPrereqError("Invalid secondary IP given")
1365     self.op.secondary_ip = secondary_ip
1366     node_list = cfg.GetNodeList()
1367     if node in node_list:
1368       raise errors.OpPrereqError("Node %s is already in the configuration"
1369                                  % node)
1370
1371     for existing_node_name in node_list:
1372       existing_node = cfg.GetNodeInfo(existing_node_name)
1373       if (existing_node.primary_ip == primary_ip or
1374           existing_node.secondary_ip == primary_ip or
1375           existing_node.primary_ip == secondary_ip or
1376           existing_node.secondary_ip == secondary_ip):
1377         raise errors.OpPrereqError("New node ip address(es) conflict with"
1378                                    " existing node %s" % existing_node.name)
1379
1380     # check that the type of the node (single versus dual homed) is the
1381     # same as for the master
1382     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1383     master_singlehomed = myself.secondary_ip == myself.primary_ip
1384     newbie_singlehomed = secondary_ip == primary_ip
1385     if master_singlehomed != newbie_singlehomed:
1386       if master_singlehomed:
1387         raise errors.OpPrereqError("The master has no private ip but the"
1388                                    " new node has one")
1389       else:
1390         raise errors.OpPrereqError("The master has a private ip but the"
1391                                    " new node doesn't have one")
1392
1393     # checks reachablity
1394     if not utils.TcpPing(utils.HostInfo().name,
1395                          primary_ip,
1396                          constants.DEFAULT_NODED_PORT):
1397       raise errors.OpPrereqError("Node not reachable by ping")
1398
1399     if not newbie_singlehomed:
1400       # check reachability from my secondary ip to newbie's secondary ip
1401       if not utils.TcpPing(myself.secondary_ip,
1402                            secondary_ip,
1403                            constants.DEFAULT_NODED_PORT):
1404         raise errors.OpPrereqError(
1405           "Node secondary ip not reachable by TCP based ping to noded port")
1406
1407     self.new_node = objects.Node(name=node,
1408                                  primary_ip=primary_ip,
1409                                  secondary_ip=secondary_ip)
1410
1411   def Exec(self, feedback_fn):
1412     """Adds the new node to the cluster.
1413
1414     """
1415     new_node = self.new_node
1416     node = new_node.name
1417
1418     # set up inter-node password and certificate and restarts the node daemon
1419     gntpass = self.sstore.GetNodeDaemonPassword()
1420     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1421       raise errors.OpExecError("ganeti password corruption detected")
1422     f = open(constants.SSL_CERT_FILE)
1423     try:
1424       gntpem = f.read(8192)
1425     finally:
1426       f.close()
1427     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1428     # so we use this to detect an invalid certificate; as long as the
1429     # cert doesn't contain this, the here-document will be correctly
1430     # parsed by the shell sequence below
1431     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1432       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1433     if not gntpem.endswith("\n"):
1434       raise errors.OpExecError("PEM must end with newline")
1435     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1436
1437     # and then connect with ssh to set password and start ganeti-noded
1438     # note that all the below variables are sanitized at this point,
1439     # either by being constants or by the checks above
1440     ss = self.sstore
1441     mycommand = ("umask 077 && "
1442                  "echo '%s' > '%s' && "
1443                  "cat > '%s' << '!EOF.' && \n"
1444                  "%s!EOF.\n%s restart" %
1445                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1446                   constants.SSL_CERT_FILE, gntpem,
1447                   constants.NODE_INITD_SCRIPT))
1448
1449     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1450     if result.failed:
1451       raise errors.OpExecError("Remote command on node %s, error: %s,"
1452                                " output: %s" %
1453                                (node, result.fail_reason, result.output))
1454
1455     # check connectivity
1456     time.sleep(4)
1457
1458     result = rpc.call_version([node])[node]
1459     if result:
1460       if constants.PROTOCOL_VERSION == result:
1461         logger.Info("communication to node %s fine, sw version %s match" %
1462                     (node, result))
1463       else:
1464         raise errors.OpExecError("Version mismatch master version %s,"
1465                                  " node version %s" %
1466                                  (constants.PROTOCOL_VERSION, result))
1467     else:
1468       raise errors.OpExecError("Cannot get version from the new node")
1469
1470     # setup ssh on node
1471     logger.Info("copy ssh key to node %s" % node)
1472     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1473     keyarray = []
1474     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1475                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1476                 priv_key, pub_key]
1477
1478     for i in keyfiles:
1479       f = open(i, 'r')
1480       try:
1481         keyarray.append(f.read())
1482       finally:
1483         f.close()
1484
1485     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1486                                keyarray[3], keyarray[4], keyarray[5])
1487
1488     if not result:
1489       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1490
1491     # Add node to our /etc/hosts, and add key to known_hosts
1492     _AddHostToEtcHosts(new_node.name)
1493
1494     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1495                       self.cfg.GetHostKey())
1496
1497     if new_node.secondary_ip != new_node.primary_ip:
1498       if not rpc.call_node_tcp_ping(new_node.name,
1499                                     constants.LOCALHOST_IP_ADDRESS,
1500                                     new_node.secondary_ip,
1501                                     constants.DEFAULT_NODED_PORT,
1502                                     10, False):
1503         raise errors.OpExecError("Node claims it doesn't have the"
1504                                  " secondary ip you gave (%s).\n"
1505                                  "Please fix and re-run this command." %
1506                                  new_node.secondary_ip)
1507
1508     success, msg = ssh.VerifyNodeHostname(node)
1509     if not success:
1510       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1511                                " than the one the resolver gives: %s.\n"
1512                                "Please fix and re-run this command." %
1513                                (node, msg))
1514
1515     # Distribute updated /etc/hosts and known_hosts to all nodes,
1516     # including the node just added
1517     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1518     dist_nodes = self.cfg.GetNodeList() + [node]
1519     if myself.name in dist_nodes:
1520       dist_nodes.remove(myself.name)
1521
1522     logger.Debug("Copying hosts and known_hosts to all nodes")
1523     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1524       result = rpc.call_upload_file(dist_nodes, fname)
1525       for to_node in dist_nodes:
1526         if not result[to_node]:
1527           logger.Error("copy of file %s to node %s failed" %
1528                        (fname, to_node))
1529
1530     to_copy = ss.GetFileList()
1531     for fname in to_copy:
1532       if not ssh.CopyFileToNode(node, fname):
1533         logger.Error("could not copy file %s to node %s" % (fname, node))
1534
1535     logger.Info("adding node %s to cluster.conf" % node)
1536     self.cfg.AddNode(new_node)
1537
1538
1539 class LUMasterFailover(LogicalUnit):
1540   """Failover the master node to the current node.
1541
1542   This is a special LU in that it must run on a non-master node.
1543
1544   """
1545   HPATH = "master-failover"
1546   HTYPE = constants.HTYPE_CLUSTER
1547   REQ_MASTER = False
1548   _OP_REQP = []
1549
1550   def BuildHooksEnv(self):
1551     """Build hooks env.
1552
1553     This will run on the new master only in the pre phase, and on all
1554     the nodes in the post phase.
1555
1556     """
1557     env = {
1558       "OP_TARGET": self.new_master,
1559       "NEW_MASTER": self.new_master,
1560       "OLD_MASTER": self.old_master,
1561       }
1562     return env, [self.new_master], self.cfg.GetNodeList()
1563
1564   def CheckPrereq(self):
1565     """Check prerequisites.
1566
1567     This checks that we are not already the master.
1568
1569     """
1570     self.new_master = utils.HostInfo().name
1571     self.old_master = self.sstore.GetMasterNode()
1572
1573     if self.old_master == self.new_master:
1574       raise errors.OpPrereqError("This commands must be run on the node"
1575                                  " where you want the new master to be.\n"
1576                                  "%s is already the master" %
1577                                  self.old_master)
1578
1579   def Exec(self, feedback_fn):
1580     """Failover the master node.
1581
1582     This command, when run on a non-master node, will cause the current
1583     master to cease being master, and the non-master to become new
1584     master.
1585
1586     """
1587     #TODO: do not rely on gethostname returning the FQDN
1588     logger.Info("setting master to %s, old master: %s" %
1589                 (self.new_master, self.old_master))
1590
1591     if not rpc.call_node_stop_master(self.old_master):
1592       logger.Error("could disable the master role on the old master"
1593                    " %s, please disable manually" % self.old_master)
1594
1595     ss = self.sstore
1596     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1597     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1598                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1599       logger.Error("could not distribute the new simple store master file"
1600                    " to the other nodes, please check.")
1601
1602     if not rpc.call_node_start_master(self.new_master):
1603       logger.Error("could not start the master role on the new master"
1604                    " %s, please check" % self.new_master)
1605       feedback_fn("Error in activating the master IP on the new master,\n"
1606                   "please fix manually.")
1607
1608
1609
1610 class LUQueryClusterInfo(NoHooksLU):
1611   """Query cluster configuration.
1612
1613   """
1614   _OP_REQP = []
1615   REQ_MASTER = False
1616
1617   def CheckPrereq(self):
1618     """No prerequsites needed for this LU.
1619
1620     """
1621     pass
1622
1623   def Exec(self, feedback_fn):
1624     """Return cluster config.
1625
1626     """
1627     result = {
1628       "name": self.sstore.GetClusterName(),
1629       "software_version": constants.RELEASE_VERSION,
1630       "protocol_version": constants.PROTOCOL_VERSION,
1631       "config_version": constants.CONFIG_VERSION,
1632       "os_api_version": constants.OS_API_VERSION,
1633       "export_version": constants.EXPORT_VERSION,
1634       "master": self.sstore.GetMasterNode(),
1635       "architecture": (platform.architecture()[0], platform.machine()),
1636       }
1637
1638     return result
1639
1640
1641 class LUClusterCopyFile(NoHooksLU):
1642   """Copy file to cluster.
1643
1644   """
1645   _OP_REQP = ["nodes", "filename"]
1646
1647   def CheckPrereq(self):
1648     """Check prerequisites.
1649
1650     It should check that the named file exists and that the given list
1651     of nodes is valid.
1652
1653     """
1654     if not os.path.exists(self.op.filename):
1655       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1656
1657     self.nodes = _GetWantedNodes(self, self.op.nodes)
1658
1659   def Exec(self, feedback_fn):
1660     """Copy a file from master to some nodes.
1661
1662     Args:
1663       opts - class with options as members
1664       args - list containing a single element, the file name
1665     Opts used:
1666       nodes - list containing the name of target nodes; if empty, all nodes
1667
1668     """
1669     filename = self.op.filename
1670
1671     myname = utils.HostInfo().name
1672
1673     for node in self.nodes:
1674       if node == myname:
1675         continue
1676       if not ssh.CopyFileToNode(node, filename):
1677         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1678
1679
1680 class LUDumpClusterConfig(NoHooksLU):
1681   """Return a text-representation of the cluster-config.
1682
1683   """
1684   _OP_REQP = []
1685
1686   def CheckPrereq(self):
1687     """No prerequisites.
1688
1689     """
1690     pass
1691
1692   def Exec(self, feedback_fn):
1693     """Dump a representation of the cluster config to the standard output.
1694
1695     """
1696     return self.cfg.DumpConfig()
1697
1698
1699 class LURunClusterCommand(NoHooksLU):
1700   """Run a command on some nodes.
1701
1702   """
1703   _OP_REQP = ["command", "nodes"]
1704
1705   def CheckPrereq(self):
1706     """Check prerequisites.
1707
1708     It checks that the given list of nodes is valid.
1709
1710     """
1711     self.nodes = _GetWantedNodes(self, self.op.nodes)
1712
1713   def Exec(self, feedback_fn):
1714     """Run a command on some nodes.
1715
1716     """
1717     data = []
1718     for node in self.nodes:
1719       result = ssh.SSHCall(node, "root", self.op.command)
1720       data.append((node, result.output, result.exit_code))
1721
1722     return data
1723
1724
1725 class LUActivateInstanceDisks(NoHooksLU):
1726   """Bring up an instance's disks.
1727
1728   """
1729   _OP_REQP = ["instance_name"]
1730
1731   def CheckPrereq(self):
1732     """Check prerequisites.
1733
1734     This checks that the instance is in the cluster.
1735
1736     """
1737     instance = self.cfg.GetInstanceInfo(
1738       self.cfg.ExpandInstanceName(self.op.instance_name))
1739     if instance is None:
1740       raise errors.OpPrereqError("Instance '%s' not known" %
1741                                  self.op.instance_name)
1742     self.instance = instance
1743
1744
1745   def Exec(self, feedback_fn):
1746     """Activate the disks.
1747
1748     """
1749     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1750     if not disks_ok:
1751       raise errors.OpExecError("Cannot activate block devices")
1752
1753     return disks_info
1754
1755
1756 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1757   """Prepare the block devices for an instance.
1758
1759   This sets up the block devices on all nodes.
1760
1761   Args:
1762     instance: a ganeti.objects.Instance object
1763     ignore_secondaries: if true, errors on secondary nodes won't result
1764                         in an error return from the function
1765
1766   Returns:
1767     false if the operation failed
1768     list of (host, instance_visible_name, node_visible_name) if the operation
1769          suceeded with the mapping from node devices to instance devices
1770   """
1771   device_info = []
1772   disks_ok = True
1773   for inst_disk in instance.disks:
1774     master_result = None
1775     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1776       cfg.SetDiskID(node_disk, node)
1777       is_primary = node == instance.primary_node
1778       result = rpc.call_blockdev_assemble(node, node_disk,
1779                                           instance.name, is_primary)
1780       if not result:
1781         logger.Error("could not prepare block device %s on node %s (is_pri"
1782                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1783         if is_primary or not ignore_secondaries:
1784           disks_ok = False
1785       if is_primary:
1786         master_result = result
1787     device_info.append((instance.primary_node, inst_disk.iv_name,
1788                         master_result))
1789
1790   # leave the disks configured for the primary node
1791   # this is a workaround that would be fixed better by
1792   # improving the logical/physical id handling
1793   for disk in instance.disks:
1794     cfg.SetDiskID(disk, instance.primary_node)
1795
1796   return disks_ok, device_info
1797
1798
1799 def _StartInstanceDisks(cfg, instance, force):
1800   """Start the disks of an instance.
1801
1802   """
1803   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1804                                            ignore_secondaries=force)
1805   if not disks_ok:
1806     _ShutdownInstanceDisks(instance, cfg)
1807     if force is not None and not force:
1808       logger.Error("If the message above refers to a secondary node,"
1809                    " you can retry the operation using '--force'.")
1810     raise errors.OpExecError("Disk consistency error")
1811
1812
1813 class LUDeactivateInstanceDisks(NoHooksLU):
1814   """Shutdown an instance's disks.
1815
1816   """
1817   _OP_REQP = ["instance_name"]
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     This checks that the instance is in the cluster.
1823
1824     """
1825     instance = self.cfg.GetInstanceInfo(
1826       self.cfg.ExpandInstanceName(self.op.instance_name))
1827     if instance is None:
1828       raise errors.OpPrereqError("Instance '%s' not known" %
1829                                  self.op.instance_name)
1830     self.instance = instance
1831
1832   def Exec(self, feedback_fn):
1833     """Deactivate the disks
1834
1835     """
1836     instance = self.instance
1837     ins_l = rpc.call_instance_list([instance.primary_node])
1838     ins_l = ins_l[instance.primary_node]
1839     if not type(ins_l) is list:
1840       raise errors.OpExecError("Can't contact node '%s'" %
1841                                instance.primary_node)
1842
1843     if self.instance.name in ins_l:
1844       raise errors.OpExecError("Instance is running, can't shutdown"
1845                                " block devices.")
1846
1847     _ShutdownInstanceDisks(instance, self.cfg)
1848
1849
1850 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1851   """Shutdown block devices of an instance.
1852
1853   This does the shutdown on all nodes of the instance.
1854
1855   If the ignore_primary is false, errors on the primary node are
1856   ignored.
1857
1858   """
1859   result = True
1860   for disk in instance.disks:
1861     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1862       cfg.SetDiskID(top_disk, node)
1863       if not rpc.call_blockdev_shutdown(node, top_disk):
1864         logger.Error("could not shutdown block device %s on node %s" %
1865                      (disk.iv_name, node))
1866         if not ignore_primary or node != instance.primary_node:
1867           result = False
1868   return result
1869
1870
1871 class LUStartupInstance(LogicalUnit):
1872   """Starts an instance.
1873
1874   """
1875   HPATH = "instance-start"
1876   HTYPE = constants.HTYPE_INSTANCE
1877   _OP_REQP = ["instance_name", "force"]
1878
1879   def BuildHooksEnv(self):
1880     """Build hooks env.
1881
1882     This runs on master, primary and secondary nodes of the instance.
1883
1884     """
1885     env = {
1886       "FORCE": self.op.force,
1887       }
1888     env.update(_BuildInstanceHookEnvByObject(self.instance))
1889     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1890           list(self.instance.secondary_nodes))
1891     return env, nl, nl
1892
1893   def CheckPrereq(self):
1894     """Check prerequisites.
1895
1896     This checks that the instance is in the cluster.
1897
1898     """
1899     instance = self.cfg.GetInstanceInfo(
1900       self.cfg.ExpandInstanceName(self.op.instance_name))
1901     if instance is None:
1902       raise errors.OpPrereqError("Instance '%s' not known" %
1903                                  self.op.instance_name)
1904
1905     # check bridges existance
1906     _CheckInstanceBridgesExist(instance)
1907
1908     self.instance = instance
1909     self.op.instance_name = instance.name
1910
1911   def Exec(self, feedback_fn):
1912     """Start the instance.
1913
1914     """
1915     instance = self.instance
1916     force = self.op.force
1917     extra_args = getattr(self.op, "extra_args", "")
1918
1919     node_current = instance.primary_node
1920
1921     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1922     if not nodeinfo:
1923       raise errors.OpExecError("Could not contact node %s for infos" %
1924                                (node_current))
1925
1926     freememory = nodeinfo[node_current]['memory_free']
1927     memory = instance.memory
1928     if memory > freememory:
1929       raise errors.OpExecError("Not enough memory to start instance"
1930                                " %s on node %s"
1931                                " needed %s MiB, available %s MiB" %
1932                                (instance.name, node_current, memory,
1933                                 freememory))
1934
1935     _StartInstanceDisks(self.cfg, instance, force)
1936
1937     if not rpc.call_instance_start(node_current, instance, extra_args):
1938       _ShutdownInstanceDisks(instance, self.cfg)
1939       raise errors.OpExecError("Could not start instance")
1940
1941     self.cfg.MarkInstanceUp(instance.name)
1942
1943
1944 class LURebootInstance(LogicalUnit):
1945   """Reboot an instance.
1946
1947   """
1948   HPATH = "instance-reboot"
1949   HTYPE = constants.HTYPE_INSTANCE
1950   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1951
1952   def BuildHooksEnv(self):
1953     """Build hooks env.
1954
1955     This runs on master, primary and secondary nodes of the instance.
1956
1957     """
1958     env = {
1959       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1960       }
1961     env.update(_BuildInstanceHookEnvByObject(self.instance))
1962     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1963           list(self.instance.secondary_nodes))
1964     return env, nl, nl
1965
1966   def CheckPrereq(self):
1967     """Check prerequisites.
1968
1969     This checks that the instance is in the cluster.
1970
1971     """
1972     instance = self.cfg.GetInstanceInfo(
1973       self.cfg.ExpandInstanceName(self.op.instance_name))
1974     if instance is None:
1975       raise errors.OpPrereqError("Instance '%s' not known" %
1976                                  self.op.instance_name)
1977
1978     # check bridges existance
1979     _CheckInstanceBridgesExist(instance)
1980
1981     self.instance = instance
1982     self.op.instance_name = instance.name
1983
1984   def Exec(self, feedback_fn):
1985     """Reboot the instance.
1986
1987     """
1988     instance = self.instance
1989     ignore_secondaries = self.op.ignore_secondaries
1990     reboot_type = self.op.reboot_type
1991     extra_args = getattr(self.op, "extra_args", "")
1992
1993     node_current = instance.primary_node
1994
1995     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
1996                            constants.INSTANCE_REBOOT_HARD,
1997                            constants.INSTANCE_REBOOT_FULL]:
1998       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
1999                                   (constants.INSTANCE_REBOOT_SOFT,
2000                                    constants.INSTANCE_REBOOT_HARD,
2001                                    constants.INSTANCE_REBOOT_FULL))
2002
2003     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2004                        constants.INSTANCE_REBOOT_HARD]:
2005       if not rpc.call_instance_reboot(node_current, instance,
2006                                       reboot_type, extra_args):
2007         raise errors.OpExecError("Could not reboot instance")
2008     else:
2009       if not rpc.call_instance_shutdown(node_current, instance):
2010         raise errors.OpExecError("could not shutdown instance for full reboot")
2011       _ShutdownInstanceDisks(instance, self.cfg)
2012       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2013       if not rpc.call_instance_start(node_current, instance, extra_args):
2014         _ShutdownInstanceDisks(instance, self.cfg)
2015         raise errors.OpExecError("Could not start instance for full reboot")
2016
2017     self.cfg.MarkInstanceUp(instance.name)
2018
2019
2020 class LUShutdownInstance(LogicalUnit):
2021   """Shutdown an instance.
2022
2023   """
2024   HPATH = "instance-stop"
2025   HTYPE = constants.HTYPE_INSTANCE
2026   _OP_REQP = ["instance_name"]
2027
2028   def BuildHooksEnv(self):
2029     """Build hooks env.
2030
2031     This runs on master, primary and secondary nodes of the instance.
2032
2033     """
2034     env = _BuildInstanceHookEnvByObject(self.instance)
2035     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2036           list(self.instance.secondary_nodes))
2037     return env, nl, nl
2038
2039   def CheckPrereq(self):
2040     """Check prerequisites.
2041
2042     This checks that the instance is in the cluster.
2043
2044     """
2045     instance = self.cfg.GetInstanceInfo(
2046       self.cfg.ExpandInstanceName(self.op.instance_name))
2047     if instance is None:
2048       raise errors.OpPrereqError("Instance '%s' not known" %
2049                                  self.op.instance_name)
2050     self.instance = instance
2051
2052   def Exec(self, feedback_fn):
2053     """Shutdown the instance.
2054
2055     """
2056     instance = self.instance
2057     node_current = instance.primary_node
2058     if not rpc.call_instance_shutdown(node_current, instance):
2059       logger.Error("could not shutdown instance")
2060
2061     self.cfg.MarkInstanceDown(instance.name)
2062     _ShutdownInstanceDisks(instance, self.cfg)
2063
2064
2065 class LUReinstallInstance(LogicalUnit):
2066   """Reinstall an instance.
2067
2068   """
2069   HPATH = "instance-reinstall"
2070   HTYPE = constants.HTYPE_INSTANCE
2071   _OP_REQP = ["instance_name"]
2072
2073   def BuildHooksEnv(self):
2074     """Build hooks env.
2075
2076     This runs on master, primary and secondary nodes of the instance.
2077
2078     """
2079     env = _BuildInstanceHookEnvByObject(self.instance)
2080     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2081           list(self.instance.secondary_nodes))
2082     return env, nl, nl
2083
2084   def CheckPrereq(self):
2085     """Check prerequisites.
2086
2087     This checks that the instance is in the cluster and is not running.
2088
2089     """
2090     instance = self.cfg.GetInstanceInfo(
2091       self.cfg.ExpandInstanceName(self.op.instance_name))
2092     if instance is None:
2093       raise errors.OpPrereqError("Instance '%s' not known" %
2094                                  self.op.instance_name)
2095     if instance.disk_template == constants.DT_DISKLESS:
2096       raise errors.OpPrereqError("Instance '%s' has no disks" %
2097                                  self.op.instance_name)
2098     if instance.status != "down":
2099       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2100                                  self.op.instance_name)
2101     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2102     if remote_info:
2103       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2104                                  (self.op.instance_name,
2105                                   instance.primary_node))
2106
2107     self.op.os_type = getattr(self.op, "os_type", None)
2108     if self.op.os_type is not None:
2109       # OS verification
2110       pnode = self.cfg.GetNodeInfo(
2111         self.cfg.ExpandNodeName(instance.primary_node))
2112       if pnode is None:
2113         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2114                                    self.op.pnode)
2115       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2116       if not os_obj:
2117         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2118                                    " primary node"  % self.op.os_type)
2119
2120     self.instance = instance
2121
2122   def Exec(self, feedback_fn):
2123     """Reinstall the instance.
2124
2125     """
2126     inst = self.instance
2127
2128     if self.op.os_type is not None:
2129       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2130       inst.os = self.op.os_type
2131       self.cfg.AddInstance(inst)
2132
2133     _StartInstanceDisks(self.cfg, inst, None)
2134     try:
2135       feedback_fn("Running the instance OS create scripts...")
2136       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2137         raise errors.OpExecError("Could not install OS for instance %s "
2138                                  "on node %s" %
2139                                  (inst.name, inst.primary_node))
2140     finally:
2141       _ShutdownInstanceDisks(inst, self.cfg)
2142
2143
2144 class LURenameInstance(LogicalUnit):
2145   """Rename an instance.
2146
2147   """
2148   HPATH = "instance-rename"
2149   HTYPE = constants.HTYPE_INSTANCE
2150   _OP_REQP = ["instance_name", "new_name"]
2151
2152   def BuildHooksEnv(self):
2153     """Build hooks env.
2154
2155     This runs on master, primary and secondary nodes of the instance.
2156
2157     """
2158     env = _BuildInstanceHookEnvByObject(self.instance)
2159     env["INSTANCE_NEW_NAME"] = self.op.new_name
2160     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2161           list(self.instance.secondary_nodes))
2162     return env, nl, nl
2163
2164   def CheckPrereq(self):
2165     """Check prerequisites.
2166
2167     This checks that the instance is in the cluster and is not running.
2168
2169     """
2170     instance = self.cfg.GetInstanceInfo(
2171       self.cfg.ExpandInstanceName(self.op.instance_name))
2172     if instance is None:
2173       raise errors.OpPrereqError("Instance '%s' not known" %
2174                                  self.op.instance_name)
2175     if instance.status != "down":
2176       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2177                                  self.op.instance_name)
2178     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2179     if remote_info:
2180       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2181                                  (self.op.instance_name,
2182                                   instance.primary_node))
2183     self.instance = instance
2184
2185     # new name verification
2186     name_info = utils.HostInfo(self.op.new_name)
2187
2188     self.op.new_name = new_name = name_info.name
2189     if not getattr(self.op, "ignore_ip", False):
2190       command = ["fping", "-q", name_info.ip]
2191       result = utils.RunCmd(command)
2192       if not result.failed:
2193         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2194                                    (name_info.ip, new_name))
2195
2196
2197   def Exec(self, feedback_fn):
2198     """Reinstall the instance.
2199
2200     """
2201     inst = self.instance
2202     old_name = inst.name
2203
2204     self.cfg.RenameInstance(inst.name, self.op.new_name)
2205
2206     # re-read the instance from the configuration after rename
2207     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2208
2209     _StartInstanceDisks(self.cfg, inst, None)
2210     try:
2211       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2212                                           "sda", "sdb"):
2213         msg = ("Could run OS rename script for instance %s\n"
2214                "on node %s\n"
2215                "(but the instance has been renamed in Ganeti)" %
2216                (inst.name, inst.primary_node))
2217         logger.Error(msg)
2218     finally:
2219       _ShutdownInstanceDisks(inst, self.cfg)
2220
2221
2222 class LURemoveInstance(LogicalUnit):
2223   """Remove an instance.
2224
2225   """
2226   HPATH = "instance-remove"
2227   HTYPE = constants.HTYPE_INSTANCE
2228   _OP_REQP = ["instance_name"]
2229
2230   def BuildHooksEnv(self):
2231     """Build hooks env.
2232
2233     This runs on master, primary and secondary nodes of the instance.
2234
2235     """
2236     env = _BuildInstanceHookEnvByObject(self.instance)
2237     nl = [self.sstore.GetMasterNode()]
2238     return env, nl, nl
2239
2240   def CheckPrereq(self):
2241     """Check prerequisites.
2242
2243     This checks that the instance is in the cluster.
2244
2245     """
2246     instance = self.cfg.GetInstanceInfo(
2247       self.cfg.ExpandInstanceName(self.op.instance_name))
2248     if instance is None:
2249       raise errors.OpPrereqError("Instance '%s' not known" %
2250                                  self.op.instance_name)
2251     self.instance = instance
2252
2253   def Exec(self, feedback_fn):
2254     """Remove the instance.
2255
2256     """
2257     instance = self.instance
2258     logger.Info("shutting down instance %s on node %s" %
2259                 (instance.name, instance.primary_node))
2260
2261     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2262       if self.op.ignore_failures:
2263         feedback_fn("Warning: can't shutdown instance")
2264       else:
2265         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2266                                  (instance.name, instance.primary_node))
2267
2268     logger.Info("removing block devices for instance %s" % instance.name)
2269
2270     if not _RemoveDisks(instance, self.cfg):
2271       if self.op.ignore_failures:
2272         feedback_fn("Warning: can't remove instance's disks")
2273       else:
2274         raise errors.OpExecError("Can't remove instance's disks")
2275
2276     logger.Info("removing instance %s out of cluster config" % instance.name)
2277
2278     self.cfg.RemoveInstance(instance.name)
2279
2280
2281 class LUQueryInstances(NoHooksLU):
2282   """Logical unit for querying instances.
2283
2284   """
2285   _OP_REQP = ["output_fields", "names"]
2286
2287   def CheckPrereq(self):
2288     """Check prerequisites.
2289
2290     This checks that the fields required are valid output fields.
2291
2292     """
2293     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2294     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2295                                "admin_state", "admin_ram",
2296                                "disk_template", "ip", "mac", "bridge",
2297                                "sda_size", "sdb_size"],
2298                        dynamic=self.dynamic_fields,
2299                        selected=self.op.output_fields)
2300
2301     self.wanted = _GetWantedInstances(self, self.op.names)
2302
2303   def Exec(self, feedback_fn):
2304     """Computes the list of nodes and their attributes.
2305
2306     """
2307     instance_names = self.wanted
2308     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2309                      in instance_names]
2310
2311     # begin data gathering
2312
2313     nodes = frozenset([inst.primary_node for inst in instance_list])
2314
2315     bad_nodes = []
2316     if self.dynamic_fields.intersection(self.op.output_fields):
2317       live_data = {}
2318       node_data = rpc.call_all_instances_info(nodes)
2319       for name in nodes:
2320         result = node_data[name]
2321         if result:
2322           live_data.update(result)
2323         elif result == False:
2324           bad_nodes.append(name)
2325         # else no instance is alive
2326     else:
2327       live_data = dict([(name, {}) for name in instance_names])
2328
2329     # end data gathering
2330
2331     output = []
2332     for instance in instance_list:
2333       iout = []
2334       for field in self.op.output_fields:
2335         if field == "name":
2336           val = instance.name
2337         elif field == "os":
2338           val = instance.os
2339         elif field == "pnode":
2340           val = instance.primary_node
2341         elif field == "snodes":
2342           val = list(instance.secondary_nodes)
2343         elif field == "admin_state":
2344           val = (instance.status != "down")
2345         elif field == "oper_state":
2346           if instance.primary_node in bad_nodes:
2347             val = None
2348           else:
2349             val = bool(live_data.get(instance.name))
2350         elif field == "admin_ram":
2351           val = instance.memory
2352         elif field == "oper_ram":
2353           if instance.primary_node in bad_nodes:
2354             val = None
2355           elif instance.name in live_data:
2356             val = live_data[instance.name].get("memory", "?")
2357           else:
2358             val = "-"
2359         elif field == "disk_template":
2360           val = instance.disk_template
2361         elif field == "ip":
2362           val = instance.nics[0].ip
2363         elif field == "bridge":
2364           val = instance.nics[0].bridge
2365         elif field == "mac":
2366           val = instance.nics[0].mac
2367         elif field == "sda_size" or field == "sdb_size":
2368           disk = instance.FindDisk(field[:3])
2369           if disk is None:
2370             val = None
2371           else:
2372             val = disk.size
2373         else:
2374           raise errors.ParameterError(field)
2375         iout.append(val)
2376       output.append(iout)
2377
2378     return output
2379
2380
2381 class LUFailoverInstance(LogicalUnit):
2382   """Failover an instance.
2383
2384   """
2385   HPATH = "instance-failover"
2386   HTYPE = constants.HTYPE_INSTANCE
2387   _OP_REQP = ["instance_name", "ignore_consistency"]
2388
2389   def BuildHooksEnv(self):
2390     """Build hooks env.
2391
2392     This runs on master, primary and secondary nodes of the instance.
2393
2394     """
2395     env = {
2396       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2397       }
2398     env.update(_BuildInstanceHookEnvByObject(self.instance))
2399     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2400     return env, nl, nl
2401
2402   def CheckPrereq(self):
2403     """Check prerequisites.
2404
2405     This checks that the instance is in the cluster.
2406
2407     """
2408     instance = self.cfg.GetInstanceInfo(
2409       self.cfg.ExpandInstanceName(self.op.instance_name))
2410     if instance is None:
2411       raise errors.OpPrereqError("Instance '%s' not known" %
2412                                  self.op.instance_name)
2413
2414     if instance.disk_template not in constants.DTS_NET_MIRROR:
2415       raise errors.OpPrereqError("Instance's disk layout is not"
2416                                  " network mirrored, cannot failover.")
2417
2418     secondary_nodes = instance.secondary_nodes
2419     if not secondary_nodes:
2420       raise errors.ProgrammerError("no secondary node but using "
2421                                    "DT_REMOTE_RAID1 template")
2422
2423     # check memory requirements on the secondary node
2424     target_node = secondary_nodes[0]
2425     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2426     info = nodeinfo.get(target_node, None)
2427     if not info:
2428       raise errors.OpPrereqError("Cannot get current information"
2429                                  " from node '%s'" % nodeinfo)
2430     if instance.memory > info['memory_free']:
2431       raise errors.OpPrereqError("Not enough memory on target node %s."
2432                                  " %d MB available, %d MB required" %
2433                                  (target_node, info['memory_free'],
2434                                   instance.memory))
2435
2436     # check bridge existance
2437     brlist = [nic.bridge for nic in instance.nics]
2438     if not rpc.call_bridges_exist(target_node, brlist):
2439       raise errors.OpPrereqError("One or more target bridges %s does not"
2440                                  " exist on destination node '%s'" %
2441                                  (brlist, target_node))
2442
2443     self.instance = instance
2444
2445   def Exec(self, feedback_fn):
2446     """Failover an instance.
2447
2448     The failover is done by shutting it down on its present node and
2449     starting it on the secondary.
2450
2451     """
2452     instance = self.instance
2453
2454     source_node = instance.primary_node
2455     target_node = instance.secondary_nodes[0]
2456
2457     feedback_fn("* checking disk consistency between source and target")
2458     for dev in instance.disks:
2459       # for remote_raid1, these are md over drbd
2460       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2461         if not self.op.ignore_consistency:
2462           raise errors.OpExecError("Disk %s is degraded on target node,"
2463                                    " aborting failover." % dev.iv_name)
2464
2465     feedback_fn("* checking target node resource availability")
2466     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2467
2468     if not nodeinfo:
2469       raise errors.OpExecError("Could not contact target node %s." %
2470                                target_node)
2471
2472     free_memory = int(nodeinfo[target_node]['memory_free'])
2473     memory = instance.memory
2474     if memory > free_memory:
2475       raise errors.OpExecError("Not enough memory to create instance %s on"
2476                                " node %s. needed %s MiB, available %s MiB" %
2477                                (instance.name, target_node, memory,
2478                                 free_memory))
2479
2480     feedback_fn("* shutting down instance on source node")
2481     logger.Info("Shutting down instance %s on node %s" %
2482                 (instance.name, source_node))
2483
2484     if not rpc.call_instance_shutdown(source_node, instance):
2485       if self.op.ignore_consistency:
2486         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2487                      " anyway. Please make sure node %s is down"  %
2488                      (instance.name, source_node, source_node))
2489       else:
2490         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2491                                  (instance.name, source_node))
2492
2493     feedback_fn("* deactivating the instance's disks on source node")
2494     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2495       raise errors.OpExecError("Can't shut down the instance's disks.")
2496
2497     instance.primary_node = target_node
2498     # distribute new instance config to the other nodes
2499     self.cfg.AddInstance(instance)
2500
2501     feedback_fn("* activating the instance's disks on target node")
2502     logger.Info("Starting instance %s on node %s" %
2503                 (instance.name, target_node))
2504
2505     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2506                                              ignore_secondaries=True)
2507     if not disks_ok:
2508       _ShutdownInstanceDisks(instance, self.cfg)
2509       raise errors.OpExecError("Can't activate the instance's disks")
2510
2511     feedback_fn("* starting the instance on the target node")
2512     if not rpc.call_instance_start(target_node, instance, None):
2513       _ShutdownInstanceDisks(instance, self.cfg)
2514       raise errors.OpExecError("Could not start instance %s on node %s." %
2515                                (instance.name, target_node))
2516
2517
2518 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2519   """Create a tree of block devices on the primary node.
2520
2521   This always creates all devices.
2522
2523   """
2524   if device.children:
2525     for child in device.children:
2526       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2527         return False
2528
2529   cfg.SetDiskID(device, node)
2530   new_id = rpc.call_blockdev_create(node, device, device.size,
2531                                     instance.name, True, info)
2532   if not new_id:
2533     return False
2534   if device.physical_id is None:
2535     device.physical_id = new_id
2536   return True
2537
2538
2539 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2540   """Create a tree of block devices on a secondary node.
2541
2542   If this device type has to be created on secondaries, create it and
2543   all its children.
2544
2545   If not, just recurse to children keeping the same 'force' value.
2546
2547   """
2548   if device.CreateOnSecondary():
2549     force = True
2550   if device.children:
2551     for child in device.children:
2552       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2553                                         child, force, info):
2554         return False
2555
2556   if not force:
2557     return True
2558   cfg.SetDiskID(device, node)
2559   new_id = rpc.call_blockdev_create(node, device, device.size,
2560                                     instance.name, False, info)
2561   if not new_id:
2562     return False
2563   if device.physical_id is None:
2564     device.physical_id = new_id
2565   return True
2566
2567
2568 def _GenerateUniqueNames(cfg, exts):
2569   """Generate a suitable LV name.
2570
2571   This will generate a logical volume name for the given instance.
2572
2573   """
2574   results = []
2575   for val in exts:
2576     new_id = cfg.GenerateUniqueID()
2577     results.append("%s%s" % (new_id, val))
2578   return results
2579
2580
2581 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2582   """Generate a drbd device complete with its children.
2583
2584   """
2585   port = cfg.AllocatePort()
2586   vgname = cfg.GetVGName()
2587   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2588                           logical_id=(vgname, names[0]))
2589   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2590                           logical_id=(vgname, names[1]))
2591   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2592                           logical_id = (primary, secondary, port),
2593                           children = [dev_data, dev_meta])
2594   return drbd_dev
2595
2596
2597 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2598   """Generate a drbd8 device complete with its children.
2599
2600   """
2601   port = cfg.AllocatePort()
2602   vgname = cfg.GetVGName()
2603   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2604                           logical_id=(vgname, names[0]))
2605   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2606                           logical_id=(vgname, names[1]))
2607   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2608                           logical_id = (primary, secondary, port),
2609                           children = [dev_data, dev_meta],
2610                           iv_name=iv_name)
2611   return drbd_dev
2612
2613 def _GenerateDiskTemplate(cfg, template_name,
2614                           instance_name, primary_node,
2615                           secondary_nodes, disk_sz, swap_sz):
2616   """Generate the entire disk layout for a given template type.
2617
2618   """
2619   #TODO: compute space requirements
2620
2621   vgname = cfg.GetVGName()
2622   if template_name == "diskless":
2623     disks = []
2624   elif template_name == "plain":
2625     if len(secondary_nodes) != 0:
2626       raise errors.ProgrammerError("Wrong template configuration")
2627
2628     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2629     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2630                            logical_id=(vgname, names[0]),
2631                            iv_name = "sda")
2632     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2633                            logical_id=(vgname, names[1]),
2634                            iv_name = "sdb")
2635     disks = [sda_dev, sdb_dev]
2636   elif template_name == "local_raid1":
2637     if len(secondary_nodes) != 0:
2638       raise errors.ProgrammerError("Wrong template configuration")
2639
2640
2641     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2642                                        ".sdb_m1", ".sdb_m2"])
2643     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2644                               logical_id=(vgname, names[0]))
2645     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2646                               logical_id=(vgname, names[1]))
2647     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2648                               size=disk_sz,
2649                               children = [sda_dev_m1, sda_dev_m2])
2650     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2651                               logical_id=(vgname, names[2]))
2652     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2653                               logical_id=(vgname, names[3]))
2654     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2655                               size=swap_sz,
2656                               children = [sdb_dev_m1, sdb_dev_m2])
2657     disks = [md_sda_dev, md_sdb_dev]
2658   elif template_name == constants.DT_REMOTE_RAID1:
2659     if len(secondary_nodes) != 1:
2660       raise errors.ProgrammerError("Wrong template configuration")
2661     remote_node = secondary_nodes[0]
2662     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2663                                        ".sdb_data", ".sdb_meta"])
2664     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2665                                          disk_sz, names[0:2])
2666     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2667                               children = [drbd_sda_dev], size=disk_sz)
2668     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2669                                          swap_sz, names[2:4])
2670     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2671                               children = [drbd_sdb_dev], size=swap_sz)
2672     disks = [md_sda_dev, md_sdb_dev]
2673   elif template_name == constants.DT_DRBD8:
2674     if len(secondary_nodes) != 1:
2675       raise errors.ProgrammerError("Wrong template configuration")
2676     remote_node = secondary_nodes[0]
2677     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2678                                        ".sdb_data", ".sdb_meta"])
2679     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2680                                          disk_sz, names[0:2], "sda")
2681     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2682                                          swap_sz, names[2:4], "sdb")
2683     disks = [drbd_sda_dev, drbd_sdb_dev]
2684   else:
2685     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2686   return disks
2687
2688
2689 def _GetInstanceInfoText(instance):
2690   """Compute that text that should be added to the disk's metadata.
2691
2692   """
2693   return "originstname+%s" % instance.name
2694
2695
2696 def _CreateDisks(cfg, instance):
2697   """Create all disks for an instance.
2698
2699   This abstracts away some work from AddInstance.
2700
2701   Args:
2702     instance: the instance object
2703
2704   Returns:
2705     True or False showing the success of the creation process
2706
2707   """
2708   info = _GetInstanceInfoText(instance)
2709
2710   for device in instance.disks:
2711     logger.Info("creating volume %s for instance %s" %
2712               (device.iv_name, instance.name))
2713     #HARDCODE
2714     for secondary_node in instance.secondary_nodes:
2715       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2716                                         device, False, info):
2717         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2718                      (device.iv_name, device, secondary_node))
2719         return False
2720     #HARDCODE
2721     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2722                                     instance, device, info):
2723       logger.Error("failed to create volume %s on primary!" %
2724                    device.iv_name)
2725       return False
2726   return True
2727
2728
2729 def _RemoveDisks(instance, cfg):
2730   """Remove all disks for an instance.
2731
2732   This abstracts away some work from `AddInstance()` and
2733   `RemoveInstance()`. Note that in case some of the devices couldn't
2734   be removed, the removal will continue with the other ones (compare
2735   with `_CreateDisks()`).
2736
2737   Args:
2738     instance: the instance object
2739
2740   Returns:
2741     True or False showing the success of the removal proces
2742
2743   """
2744   logger.Info("removing block devices for instance %s" % instance.name)
2745
2746   result = True
2747   for device in instance.disks:
2748     for node, disk in device.ComputeNodeTree(instance.primary_node):
2749       cfg.SetDiskID(disk, node)
2750       if not rpc.call_blockdev_remove(node, disk):
2751         logger.Error("could not remove block device %s on node %s,"
2752                      " continuing anyway" %
2753                      (device.iv_name, node))
2754         result = False
2755   return result
2756
2757
2758 class LUCreateInstance(LogicalUnit):
2759   """Create an instance.
2760
2761   """
2762   HPATH = "instance-add"
2763   HTYPE = constants.HTYPE_INSTANCE
2764   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2765               "disk_template", "swap_size", "mode", "start", "vcpus",
2766               "wait_for_sync", "ip_check"]
2767
2768   def BuildHooksEnv(self):
2769     """Build hooks env.
2770
2771     This runs on master, primary and secondary nodes of the instance.
2772
2773     """
2774     env = {
2775       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2776       "INSTANCE_DISK_SIZE": self.op.disk_size,
2777       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2778       "INSTANCE_ADD_MODE": self.op.mode,
2779       }
2780     if self.op.mode == constants.INSTANCE_IMPORT:
2781       env["INSTANCE_SRC_NODE"] = self.op.src_node
2782       env["INSTANCE_SRC_PATH"] = self.op.src_path
2783       env["INSTANCE_SRC_IMAGE"] = self.src_image
2784
2785     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2786       primary_node=self.op.pnode,
2787       secondary_nodes=self.secondaries,
2788       status=self.instance_status,
2789       os_type=self.op.os_type,
2790       memory=self.op.mem_size,
2791       vcpus=self.op.vcpus,
2792       nics=[(self.inst_ip, self.op.bridge)],
2793     ))
2794
2795     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2796           self.secondaries)
2797     return env, nl, nl
2798
2799
2800   def CheckPrereq(self):
2801     """Check prerequisites.
2802
2803     """
2804     if self.op.mode not in (constants.INSTANCE_CREATE,
2805                             constants.INSTANCE_IMPORT):
2806       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2807                                  self.op.mode)
2808
2809     if self.op.mode == constants.INSTANCE_IMPORT:
2810       src_node = getattr(self.op, "src_node", None)
2811       src_path = getattr(self.op, "src_path", None)
2812       if src_node is None or src_path is None:
2813         raise errors.OpPrereqError("Importing an instance requires source"
2814                                    " node and path options")
2815       src_node_full = self.cfg.ExpandNodeName(src_node)
2816       if src_node_full is None:
2817         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2818       self.op.src_node = src_node = src_node_full
2819
2820       if not os.path.isabs(src_path):
2821         raise errors.OpPrereqError("The source path must be absolute")
2822
2823       export_info = rpc.call_export_info(src_node, src_path)
2824
2825       if not export_info:
2826         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2827
2828       if not export_info.has_section(constants.INISECT_EXP):
2829         raise errors.ProgrammerError("Corrupted export config")
2830
2831       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2832       if (int(ei_version) != constants.EXPORT_VERSION):
2833         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2834                                    (ei_version, constants.EXPORT_VERSION))
2835
2836       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2837         raise errors.OpPrereqError("Can't import instance with more than"
2838                                    " one data disk")
2839
2840       # FIXME: are the old os-es, disk sizes, etc. useful?
2841       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2842       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2843                                                          'disk0_dump'))
2844       self.src_image = diskimage
2845     else: # INSTANCE_CREATE
2846       if getattr(self.op, "os_type", None) is None:
2847         raise errors.OpPrereqError("No guest OS specified")
2848
2849     # check primary node
2850     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2851     if pnode is None:
2852       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2853                                  self.op.pnode)
2854     self.op.pnode = pnode.name
2855     self.pnode = pnode
2856     self.secondaries = []
2857     # disk template and mirror node verification
2858     if self.op.disk_template not in constants.DISK_TEMPLATES:
2859       raise errors.OpPrereqError("Invalid disk template name")
2860
2861     if self.op.disk_template in constants.DTS_NET_MIRROR:
2862       if getattr(self.op, "snode", None) is None:
2863         raise errors.OpPrereqError("The networked disk templates need"
2864                                    " a mirror node")
2865
2866       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2867       if snode_name is None:
2868         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2869                                    self.op.snode)
2870       elif snode_name == pnode.name:
2871         raise errors.OpPrereqError("The secondary node cannot be"
2872                                    " the primary node.")
2873       self.secondaries.append(snode_name)
2874
2875     # Check lv size requirements
2876     nodenames = [pnode.name] + self.secondaries
2877     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2878
2879     # Required free disk space as a function of disk and swap space
2880     req_size_dict = {
2881       constants.DT_DISKLESS: 0,
2882       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2883       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2884       # 256 MB are added for drbd metadata, 128MB for each drbd device
2885       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2886       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2887     }
2888
2889     if self.op.disk_template not in req_size_dict:
2890       raise errors.ProgrammerError("Disk template '%s' size requirement"
2891                                    " is unknown" %  self.op.disk_template)
2892
2893     req_size = req_size_dict[self.op.disk_template]
2894
2895     for node in nodenames:
2896       info = nodeinfo.get(node, None)
2897       if not info:
2898         raise errors.OpPrereqError("Cannot get current information"
2899                                    " from node '%s'" % nodeinfo)
2900       if req_size > info['vg_free']:
2901         raise errors.OpPrereqError("Not enough disk space on target node %s."
2902                                    " %d MB available, %d MB required" %
2903                                    (node, info['vg_free'], req_size))
2904
2905     # os verification
2906     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2907     if not os_obj:
2908       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2909                                  " primary node"  % self.op.os_type)
2910
2911     # instance verification
2912     hostname1 = utils.HostInfo(self.op.instance_name)
2913
2914     self.op.instance_name = instance_name = hostname1.name
2915     instance_list = self.cfg.GetInstanceList()
2916     if instance_name in instance_list:
2917       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2918                                  instance_name)
2919
2920     ip = getattr(self.op, "ip", None)
2921     if ip is None or ip.lower() == "none":
2922       inst_ip = None
2923     elif ip.lower() == "auto":
2924       inst_ip = hostname1.ip
2925     else:
2926       if not utils.IsValidIP(ip):
2927         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2928                                    " like a valid IP" % ip)
2929       inst_ip = ip
2930     self.inst_ip = inst_ip
2931
2932     if self.op.start and not self.op.ip_check:
2933       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2934                                  " adding an instance in start mode")
2935
2936     if self.op.ip_check:
2937       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2938                        constants.DEFAULT_NODED_PORT):
2939         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2940                                    (hostname1.ip, instance_name))
2941
2942     # bridge verification
2943     bridge = getattr(self.op, "bridge", None)
2944     if bridge is None:
2945       self.op.bridge = self.cfg.GetDefBridge()
2946     else:
2947       self.op.bridge = bridge
2948
2949     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2950       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2951                                  " destination node '%s'" %
2952                                  (self.op.bridge, pnode.name))
2953
2954     if self.op.start:
2955       self.instance_status = 'up'
2956     else:
2957       self.instance_status = 'down'
2958
2959   def Exec(self, feedback_fn):
2960     """Create and add the instance to the cluster.
2961
2962     """
2963     instance = self.op.instance_name
2964     pnode_name = self.pnode.name
2965
2966     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2967     if self.inst_ip is not None:
2968       nic.ip = self.inst_ip
2969
2970     disks = _GenerateDiskTemplate(self.cfg,
2971                                   self.op.disk_template,
2972                                   instance, pnode_name,
2973                                   self.secondaries, self.op.disk_size,
2974                                   self.op.swap_size)
2975
2976     iobj = objects.Instance(name=instance, os=self.op.os_type,
2977                             primary_node=pnode_name,
2978                             memory=self.op.mem_size,
2979                             vcpus=self.op.vcpus,
2980                             nics=[nic], disks=disks,
2981                             disk_template=self.op.disk_template,
2982                             status=self.instance_status,
2983                             )
2984
2985     feedback_fn("* creating instance disks...")
2986     if not _CreateDisks(self.cfg, iobj):
2987       _RemoveDisks(iobj, self.cfg)
2988       raise errors.OpExecError("Device creation failed, reverting...")
2989
2990     feedback_fn("adding instance %s to cluster config" % instance)
2991
2992     self.cfg.AddInstance(iobj)
2993
2994     if self.op.wait_for_sync:
2995       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
2996     elif iobj.disk_template in constants.DTS_NET_MIRROR:
2997       # make sure the disks are not degraded (still sync-ing is ok)
2998       time.sleep(15)
2999       feedback_fn("* checking mirrors status")
3000       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3001     else:
3002       disk_abort = False
3003
3004     if disk_abort:
3005       _RemoveDisks(iobj, self.cfg)
3006       self.cfg.RemoveInstance(iobj.name)
3007       raise errors.OpExecError("There are some degraded disks for"
3008                                " this instance")
3009
3010     feedback_fn("creating os for instance %s on node %s" %
3011                 (instance, pnode_name))
3012
3013     if iobj.disk_template != constants.DT_DISKLESS:
3014       if self.op.mode == constants.INSTANCE_CREATE:
3015         feedback_fn("* running the instance OS create scripts...")
3016         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3017           raise errors.OpExecError("could not add os for instance %s"
3018                                    " on node %s" %
3019                                    (instance, pnode_name))
3020
3021       elif self.op.mode == constants.INSTANCE_IMPORT:
3022         feedback_fn("* running the instance OS import scripts...")
3023         src_node = self.op.src_node
3024         src_image = self.src_image
3025         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3026                                                 src_node, src_image):
3027           raise errors.OpExecError("Could not import os for instance"
3028                                    " %s on node %s" %
3029                                    (instance, pnode_name))
3030       else:
3031         # also checked in the prereq part
3032         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3033                                      % self.op.mode)
3034
3035     if self.op.start:
3036       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3037       feedback_fn("* starting instance...")
3038       if not rpc.call_instance_start(pnode_name, iobj, None):
3039         raise errors.OpExecError("Could not start instance")
3040
3041
3042 class LUConnectConsole(NoHooksLU):
3043   """Connect to an instance's console.
3044
3045   This is somewhat special in that it returns the command line that
3046   you need to run on the master node in order to connect to the
3047   console.
3048
3049   """
3050   _OP_REQP = ["instance_name"]
3051
3052   def CheckPrereq(self):
3053     """Check prerequisites.
3054
3055     This checks that the instance is in the cluster.
3056
3057     """
3058     instance = self.cfg.GetInstanceInfo(
3059       self.cfg.ExpandInstanceName(self.op.instance_name))
3060     if instance is None:
3061       raise errors.OpPrereqError("Instance '%s' not known" %
3062                                  self.op.instance_name)
3063     self.instance = instance
3064
3065   def Exec(self, feedback_fn):
3066     """Connect to the console of an instance
3067
3068     """
3069     instance = self.instance
3070     node = instance.primary_node
3071
3072     node_insts = rpc.call_instance_list([node])[node]
3073     if node_insts is False:
3074       raise errors.OpExecError("Can't connect to node %s." % node)
3075
3076     if instance.name not in node_insts:
3077       raise errors.OpExecError("Instance %s is not running." % instance.name)
3078
3079     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3080
3081     hyper = hypervisor.GetHypervisor()
3082     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3083     # build ssh cmdline
3084     argv = ["ssh", "-q", "-t"]
3085     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3086     argv.extend(ssh.BATCH_MODE_OPTS)
3087     argv.append(node)
3088     argv.append(console_cmd)
3089     return "ssh", argv
3090
3091
3092 class LUAddMDDRBDComponent(LogicalUnit):
3093   """Adda new mirror member to an instance's disk.
3094
3095   """
3096   HPATH = "mirror-add"
3097   HTYPE = constants.HTYPE_INSTANCE
3098   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3099
3100   def BuildHooksEnv(self):
3101     """Build hooks env.
3102
3103     This runs on the master, the primary and all the secondaries.
3104
3105     """
3106     env = {
3107       "NEW_SECONDARY": self.op.remote_node,
3108       "DISK_NAME": self.op.disk_name,
3109       }
3110     env.update(_BuildInstanceHookEnvByObject(self.instance))
3111     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3112           self.op.remote_node,] + list(self.instance.secondary_nodes)
3113     return env, nl, nl
3114
3115   def CheckPrereq(self):
3116     """Check prerequisites.
3117
3118     This checks that the instance is in the cluster.
3119
3120     """
3121     instance = self.cfg.GetInstanceInfo(
3122       self.cfg.ExpandInstanceName(self.op.instance_name))
3123     if instance is None:
3124       raise errors.OpPrereqError("Instance '%s' not known" %
3125                                  self.op.instance_name)
3126     self.instance = instance
3127
3128     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3129     if remote_node is None:
3130       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3131     self.remote_node = remote_node
3132
3133     if remote_node == instance.primary_node:
3134       raise errors.OpPrereqError("The specified node is the primary node of"
3135                                  " the instance.")
3136
3137     if instance.disk_template != constants.DT_REMOTE_RAID1:
3138       raise errors.OpPrereqError("Instance's disk layout is not"
3139                                  " remote_raid1.")
3140     for disk in instance.disks:
3141       if disk.iv_name == self.op.disk_name:
3142         break
3143     else:
3144       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3145                                  " instance." % self.op.disk_name)
3146     if len(disk.children) > 1:
3147       raise errors.OpPrereqError("The device already has two slave"
3148                                  " devices.\n"
3149                                  "This would create a 3-disk raid1"
3150                                  " which we don't allow.")
3151     self.disk = disk
3152
3153   def Exec(self, feedback_fn):
3154     """Add the mirror component
3155
3156     """
3157     disk = self.disk
3158     instance = self.instance
3159
3160     remote_node = self.remote_node
3161     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3162     names = _GenerateUniqueNames(self.cfg, lv_names)
3163     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3164                                      remote_node, disk.size, names)
3165
3166     logger.Info("adding new mirror component on secondary")
3167     #HARDCODE
3168     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3169                                       new_drbd, False,
3170                                       _GetInstanceInfoText(instance)):
3171       raise errors.OpExecError("Failed to create new component on secondary"
3172                                " node %s" % remote_node)
3173
3174     logger.Info("adding new mirror component on primary")
3175     #HARDCODE
3176     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3177                                     instance, new_drbd,
3178                                     _GetInstanceInfoText(instance)):
3179       # remove secondary dev
3180       self.cfg.SetDiskID(new_drbd, remote_node)
3181       rpc.call_blockdev_remove(remote_node, new_drbd)
3182       raise errors.OpExecError("Failed to create volume on primary")
3183
3184     # the device exists now
3185     # call the primary node to add the mirror to md
3186     logger.Info("adding new mirror component to md")
3187     if not rpc.call_blockdev_addchildren(instance.primary_node,
3188                                          disk, [new_drbd]):
3189       logger.Error("Can't add mirror compoment to md!")
3190       self.cfg.SetDiskID(new_drbd, remote_node)
3191       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3192         logger.Error("Can't rollback on secondary")
3193       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3194       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3195         logger.Error("Can't rollback on primary")
3196       raise errors.OpExecError("Can't add mirror component to md array")
3197
3198     disk.children.append(new_drbd)
3199
3200     self.cfg.AddInstance(instance)
3201
3202     _WaitForSync(self.cfg, instance, self.proc)
3203
3204     return 0
3205
3206
3207 class LURemoveMDDRBDComponent(LogicalUnit):
3208   """Remove a component from a remote_raid1 disk.
3209
3210   """
3211   HPATH = "mirror-remove"
3212   HTYPE = constants.HTYPE_INSTANCE
3213   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3214
3215   def BuildHooksEnv(self):
3216     """Build hooks env.
3217
3218     This runs on the master, the primary and all the secondaries.
3219
3220     """
3221     env = {
3222       "DISK_NAME": self.op.disk_name,
3223       "DISK_ID": self.op.disk_id,
3224       "OLD_SECONDARY": self.old_secondary,
3225       }
3226     env.update(_BuildInstanceHookEnvByObject(self.instance))
3227     nl = [self.sstore.GetMasterNode(),
3228           self.instance.primary_node] + list(self.instance.secondary_nodes)
3229     return env, nl, nl
3230
3231   def CheckPrereq(self):
3232     """Check prerequisites.
3233
3234     This checks that the instance is in the cluster.
3235
3236     """
3237     instance = self.cfg.GetInstanceInfo(
3238       self.cfg.ExpandInstanceName(self.op.instance_name))
3239     if instance is None:
3240       raise errors.OpPrereqError("Instance '%s' not known" %
3241                                  self.op.instance_name)
3242     self.instance = instance
3243
3244     if instance.disk_template != constants.DT_REMOTE_RAID1:
3245       raise errors.OpPrereqError("Instance's disk layout is not"
3246                                  " remote_raid1.")
3247     for disk in instance.disks:
3248       if disk.iv_name == self.op.disk_name:
3249         break
3250     else:
3251       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3252                                  " instance." % self.op.disk_name)
3253     for child in disk.children:
3254       if (child.dev_type == constants.LD_DRBD7 and
3255           child.logical_id[2] == self.op.disk_id):
3256         break
3257     else:
3258       raise errors.OpPrereqError("Can't find the device with this port.")
3259
3260     if len(disk.children) < 2:
3261       raise errors.OpPrereqError("Cannot remove the last component from"
3262                                  " a mirror.")
3263     self.disk = disk
3264     self.child = child
3265     if self.child.logical_id[0] == instance.primary_node:
3266       oid = 1
3267     else:
3268       oid = 0
3269     self.old_secondary = self.child.logical_id[oid]
3270
3271   def Exec(self, feedback_fn):
3272     """Remove the mirror component
3273
3274     """
3275     instance = self.instance
3276     disk = self.disk
3277     child = self.child
3278     logger.Info("remove mirror component")
3279     self.cfg.SetDiskID(disk, instance.primary_node)
3280     if not rpc.call_blockdev_removechildren(instance.primary_node,
3281                                             disk, [child]):
3282       raise errors.OpExecError("Can't remove child from mirror.")
3283
3284     for node in child.logical_id[:2]:
3285       self.cfg.SetDiskID(child, node)
3286       if not rpc.call_blockdev_remove(node, child):
3287         logger.Error("Warning: failed to remove device from node %s,"
3288                      " continuing operation." % node)
3289
3290     disk.children.remove(child)
3291     self.cfg.AddInstance(instance)
3292
3293
3294 class LUReplaceDisks(LogicalUnit):
3295   """Replace the disks of an instance.
3296
3297   """
3298   HPATH = "mirrors-replace"
3299   HTYPE = constants.HTYPE_INSTANCE
3300   _OP_REQP = ["instance_name", "mode", "disks"]
3301
3302   def BuildHooksEnv(self):
3303     """Build hooks env.
3304
3305     This runs on the master, the primary and all the secondaries.
3306
3307     """
3308     env = {
3309       "MODE": self.op.mode,
3310       "NEW_SECONDARY": self.op.remote_node,
3311       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3312       }
3313     env.update(_BuildInstanceHookEnvByObject(self.instance))
3314     nl = [
3315       self.sstore.GetMasterNode(),
3316       self.instance.primary_node,
3317       ]
3318     if self.op.remote_node is not None:
3319       nl.append(self.op.remote_node)
3320     return env, nl, nl
3321
3322   def CheckPrereq(self):
3323     """Check prerequisites.
3324
3325     This checks that the instance is in the cluster.
3326
3327     """
3328     instance = self.cfg.GetInstanceInfo(
3329       self.cfg.ExpandInstanceName(self.op.instance_name))
3330     if instance is None:
3331       raise errors.OpPrereqError("Instance '%s' not known" %
3332                                  self.op.instance_name)
3333     self.instance = instance
3334     self.op.instance_name = instance.name
3335
3336     if instance.disk_template not in constants.DTS_NET_MIRROR:
3337       raise errors.OpPrereqError("Instance's disk layout is not"
3338                                  " network mirrored.")
3339
3340     if len(instance.secondary_nodes) != 1:
3341       raise errors.OpPrereqError("The instance has a strange layout,"
3342                                  " expected one secondary but found %d" %
3343                                  len(instance.secondary_nodes))
3344
3345     self.sec_node = instance.secondary_nodes[0]
3346
3347     remote_node = getattr(self.op, "remote_node", None)
3348     if remote_node is not None:
3349       remote_node = self.cfg.ExpandNodeName(remote_node)
3350       if remote_node is None:
3351         raise errors.OpPrereqError("Node '%s' not known" %
3352                                    self.op.remote_node)
3353       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3354     else:
3355       self.remote_node_info = None
3356     if remote_node == instance.primary_node:
3357       raise errors.OpPrereqError("The specified node is the primary node of"
3358                                  " the instance.")
3359     elif remote_node == self.sec_node:
3360       if self.op.mode == constants.REPLACE_DISK_SEC:
3361         # this is for DRBD8, where we can't execute the same mode of
3362         # replacement as for drbd7 (no different port allocated)
3363         raise errors.OpPrereqError("Same secondary given, cannot execute"
3364                                    " replacement")
3365       # the user gave the current secondary, switch to
3366       # 'no-replace-secondary' mode for drbd7
3367       remote_node = None
3368     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3369         self.op.mode != constants.REPLACE_DISK_ALL):
3370       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3371                                  " disks replacement, not individual ones")
3372     if instance.disk_template == constants.DT_DRBD8:
3373       if (self.op.mode == constants.REPLACE_DISK_ALL and
3374           remote_node is not None):
3375         # switch to replace secondary mode
3376         self.op.mode = constants.REPLACE_DISK_SEC
3377
3378       if self.op.mode == constants.REPLACE_DISK_ALL:
3379         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3380                                    " secondary disk replacement, not"
3381                                    " both at once")
3382       elif self.op.mode == constants.REPLACE_DISK_PRI:
3383         if remote_node is not None:
3384           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3385                                      " the secondary while doing a primary"
3386                                      " node disk replacement")
3387         self.tgt_node = instance.primary_node
3388         self.oth_node = instance.secondary_nodes[0]
3389       elif self.op.mode == constants.REPLACE_DISK_SEC:
3390         self.new_node = remote_node # this can be None, in which case
3391                                     # we don't change the secondary
3392         self.tgt_node = instance.secondary_nodes[0]
3393         self.oth_node = instance.primary_node
3394       else:
3395         raise errors.ProgrammerError("Unhandled disk replace mode")
3396
3397     for name in self.op.disks:
3398       if instance.FindDisk(name) is None:
3399         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3400                                    (name, instance.name))
3401     self.op.remote_node = remote_node
3402
3403   def _ExecRR1(self, feedback_fn):
3404     """Replace the disks of an instance.
3405
3406     """
3407     instance = self.instance
3408     iv_names = {}
3409     # start of work
3410     if self.op.remote_node is None:
3411       remote_node = self.sec_node
3412     else:
3413       remote_node = self.op.remote_node
3414     cfg = self.cfg
3415     for dev in instance.disks:
3416       size = dev.size
3417       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3418       names = _GenerateUniqueNames(cfg, lv_names)
3419       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3420                                        remote_node, size, names)
3421       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3422       logger.Info("adding new mirror component on secondary for %s" %
3423                   dev.iv_name)
3424       #HARDCODE
3425       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3426                                         new_drbd, False,
3427                                         _GetInstanceInfoText(instance)):
3428         raise errors.OpExecError("Failed to create new component on"
3429                                  " secondary node %s\n"
3430                                  "Full abort, cleanup manually!" %
3431                                  remote_node)
3432
3433       logger.Info("adding new mirror component on primary")
3434       #HARDCODE
3435       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3436                                       instance, new_drbd,
3437                                       _GetInstanceInfoText(instance)):
3438         # remove secondary dev
3439         cfg.SetDiskID(new_drbd, remote_node)
3440         rpc.call_blockdev_remove(remote_node, new_drbd)
3441         raise errors.OpExecError("Failed to create volume on primary!\n"
3442                                  "Full abort, cleanup manually!!")
3443
3444       # the device exists now
3445       # call the primary node to add the mirror to md
3446       logger.Info("adding new mirror component to md")
3447       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3448                                            [new_drbd]):
3449         logger.Error("Can't add mirror compoment to md!")
3450         cfg.SetDiskID(new_drbd, remote_node)
3451         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3452           logger.Error("Can't rollback on secondary")
3453         cfg.SetDiskID(new_drbd, instance.primary_node)
3454         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3455           logger.Error("Can't rollback on primary")
3456         raise errors.OpExecError("Full abort, cleanup manually!!")
3457
3458       dev.children.append(new_drbd)
3459       cfg.AddInstance(instance)
3460
3461     # this can fail as the old devices are degraded and _WaitForSync
3462     # does a combined result over all disks, so we don't check its
3463     # return value
3464     _WaitForSync(cfg, instance, self.proc, unlock=True)
3465
3466     # so check manually all the devices
3467     for name in iv_names:
3468       dev, child, new_drbd = iv_names[name]
3469       cfg.SetDiskID(dev, instance.primary_node)
3470       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3471       if is_degr:
3472         raise errors.OpExecError("MD device %s is degraded!" % name)
3473       cfg.SetDiskID(new_drbd, instance.primary_node)
3474       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3475       if is_degr:
3476         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3477
3478     for name in iv_names:
3479       dev, child, new_drbd = iv_names[name]
3480       logger.Info("remove mirror %s component" % name)
3481       cfg.SetDiskID(dev, instance.primary_node)
3482       if not rpc.call_blockdev_removechildren(instance.primary_node,
3483                                               dev, [child]):
3484         logger.Error("Can't remove child from mirror, aborting"
3485                      " *this device cleanup*.\nYou need to cleanup manually!!")
3486         continue
3487
3488       for node in child.logical_id[:2]:
3489         logger.Info("remove child device on %s" % node)
3490         cfg.SetDiskID(child, node)
3491         if not rpc.call_blockdev_remove(node, child):
3492           logger.Error("Warning: failed to remove device from node %s,"
3493                        " continuing operation." % node)
3494
3495       dev.children.remove(child)
3496
3497       cfg.AddInstance(instance)
3498
3499   def _ExecD8DiskOnly(self, feedback_fn):
3500     """Replace a disk on the primary or secondary for dbrd8.
3501
3502     The algorithm for replace is quite complicated:
3503       - for each disk to be replaced:
3504         - create new LVs on the target node with unique names
3505         - detach old LVs from the drbd device
3506         - rename old LVs to name_replaced.<time_t>
3507         - rename new LVs to old LVs
3508         - attach the new LVs (with the old names now) to the drbd device
3509       - wait for sync across all devices
3510       - for each modified disk:
3511         - remove old LVs (which have the name name_replaces.<time_t>)
3512
3513     Failures are not very well handled.
3514
3515     """
3516     steps_total = 6
3517     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3518     instance = self.instance
3519     iv_names = {}
3520     vgname = self.cfg.GetVGName()
3521     # start of work
3522     cfg = self.cfg
3523     tgt_node = self.tgt_node
3524     oth_node = self.oth_node
3525
3526     # Step: check device activation
3527     self.proc.LogStep(1, steps_total, "check device existence")
3528     info("checking volume groups")
3529     my_vg = cfg.GetVGName()
3530     results = rpc.call_vg_list([oth_node, tgt_node])
3531     if not results:
3532       raise errors.OpExecError("Can't list volume groups on the nodes")
3533     for node in oth_node, tgt_node:
3534       res = results.get(node, False)
3535       if not res or my_vg not in res:
3536         raise errors.OpExecError("Volume group '%s' not found on %s" %
3537                                  (my_vg, node))
3538     for dev in instance.disks:
3539       if not dev.iv_name in self.op.disks:
3540         continue
3541       for node in tgt_node, oth_node:
3542         info("checking %s on %s" % (dev.iv_name, node))
3543         cfg.SetDiskID(dev, node)
3544         if not rpc.call_blockdev_find(node, dev):
3545           raise errors.OpExecError("Can't find device %s on node %s" %
3546                                    (dev.iv_name, node))
3547
3548     # Step: check other node consistency
3549     self.proc.LogStep(2, steps_total, "check peer consistency")
3550     for dev in instance.disks:
3551       if not dev.iv_name in self.op.disks:
3552         continue
3553       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3554       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3555                                    oth_node==instance.primary_node):
3556         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3557                                  " to replace disks on this node (%s)" %
3558                                  (oth_node, tgt_node))
3559
3560     # Step: create new storage
3561     self.proc.LogStep(3, steps_total, "allocate new storage")
3562     for dev in instance.disks:
3563       if not dev.iv_name in self.op.disks:
3564         continue
3565       size = dev.size
3566       cfg.SetDiskID(dev, tgt_node)
3567       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3568       names = _GenerateUniqueNames(cfg, lv_names)
3569       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3570                              logical_id=(vgname, names[0]))
3571       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3572                              logical_id=(vgname, names[1]))
3573       new_lvs = [lv_data, lv_meta]
3574       old_lvs = dev.children
3575       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3576       info("creating new local storage on %s for %s" %
3577            (tgt_node, dev.iv_name))
3578       # since we *always* want to create this LV, we use the
3579       # _Create...OnPrimary (which forces the creation), even if we
3580       # are talking about the secondary node
3581       for new_lv in new_lvs:
3582         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3583                                         _GetInstanceInfoText(instance)):
3584           raise errors.OpExecError("Failed to create new LV named '%s' on"
3585                                    " node '%s'" %
3586                                    (new_lv.logical_id[1], tgt_node))
3587
3588     # Step: for each lv, detach+rename*2+attach
3589     self.proc.LogStep(4, steps_total, "change drbd configuration")
3590     for dev, old_lvs, new_lvs in iv_names.itervalues():
3591       info("detaching %s drbd from local storage" % dev.iv_name)
3592       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3593         raise errors.OpExecError("Can't detach drbd from local storage on node"
3594                                  " %s for device %s" % (tgt_node, dev.iv_name))
3595       #dev.children = []
3596       #cfg.Update(instance)
3597
3598       # ok, we created the new LVs, so now we know we have the needed
3599       # storage; as such, we proceed on the target node to rename
3600       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3601       # using the assumption than logical_id == physical_id (which in
3602       # turn is the unique_id on that node)
3603
3604       # FIXME(iustin): use a better name for the replaced LVs
3605       temp_suffix = int(time.time())
3606       ren_fn = lambda d, suff: (d.physical_id[0],
3607                                 d.physical_id[1] + "_replaced-%s" % suff)
3608       # build the rename list based on what LVs exist on the node
3609       rlist = []
3610       for to_ren in old_lvs:
3611         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3612         if find_res is not None: # device exists
3613           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3614
3615       info("renaming the old LVs on the target node")
3616       if not rpc.call_blockdev_rename(tgt_node, rlist):
3617         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3618       # now we rename the new LVs to the old LVs
3619       info("renaming the new LVs on the target node")
3620       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3621       if not rpc.call_blockdev_rename(tgt_node, rlist):
3622         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3623
3624       for old, new in zip(old_lvs, new_lvs):
3625         new.logical_id = old.logical_id
3626         cfg.SetDiskID(new, tgt_node)
3627
3628       for disk in old_lvs:
3629         disk.logical_id = ren_fn(disk, temp_suffix)
3630         cfg.SetDiskID(disk, tgt_node)
3631
3632       # now that the new lvs have the old name, we can add them to the device
3633       info("adding new mirror component on %s" % tgt_node)
3634       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3635         for new_lv in new_lvs:
3636           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3637             warning("Can't rollback device %s", "manually cleanup unused"
3638                     " logical volumes")
3639         raise errors.OpExecError("Can't add local storage to drbd")
3640
3641       dev.children = new_lvs
3642       cfg.Update(instance)
3643
3644     # Step: wait for sync
3645
3646     # this can fail as the old devices are degraded and _WaitForSync
3647     # does a combined result over all disks, so we don't check its
3648     # return value
3649     self.proc.LogStep(5, steps_total, "sync devices")
3650     _WaitForSync(cfg, instance, self.proc, unlock=True)
3651
3652     # so check manually all the devices
3653     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3654       cfg.SetDiskID(dev, instance.primary_node)
3655       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3656       if is_degr:
3657         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3658
3659     # Step: remove old storage
3660     self.proc.LogStep(6, steps_total, "removing old storage")
3661     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3662       info("remove logical volumes for %s" % name)
3663       for lv in old_lvs:
3664         cfg.SetDiskID(lv, tgt_node)
3665         if not rpc.call_blockdev_remove(tgt_node, lv):
3666           warning("Can't remove old LV", "manually remove unused LVs")
3667           continue
3668
3669   def _ExecD8Secondary(self, feedback_fn):
3670     """Replace the secondary node for drbd8.
3671
3672     The algorithm for replace is quite complicated:
3673       - for all disks of the instance:
3674         - create new LVs on the new node with same names
3675         - shutdown the drbd device on the old secondary
3676         - disconnect the drbd network on the primary
3677         - create the drbd device on the new secondary
3678         - network attach the drbd on the primary, using an artifice:
3679           the drbd code for Attach() will connect to the network if it
3680           finds a device which is connected to the good local disks but
3681           not network enabled
3682       - wait for sync across all devices
3683       - remove all disks from the old secondary
3684
3685     Failures are not very well handled.
3686
3687     """
3688     steps_total = 6
3689     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3690     instance = self.instance
3691     iv_names = {}
3692     vgname = self.cfg.GetVGName()
3693     # start of work
3694     cfg = self.cfg
3695     old_node = self.tgt_node
3696     new_node = self.new_node
3697     pri_node = instance.primary_node
3698
3699     # Step: check device activation
3700     self.proc.LogStep(1, steps_total, "check device existence")
3701     info("checking volume groups")
3702     my_vg = cfg.GetVGName()
3703     results = rpc.call_vg_list([pri_node, new_node])
3704     if not results:
3705       raise errors.OpExecError("Can't list volume groups on the nodes")
3706     for node in pri_node, new_node:
3707       res = results.get(node, False)
3708       if not res or my_vg not in res:
3709         raise errors.OpExecError("Volume group '%s' not found on %s" %
3710                                  (my_vg, node))
3711     for dev in instance.disks:
3712       if not dev.iv_name in self.op.disks:
3713         continue
3714       info("checking %s on %s" % (dev.iv_name, pri_node))
3715       cfg.SetDiskID(dev, pri_node)
3716       if not rpc.call_blockdev_find(pri_node, dev):
3717         raise errors.OpExecError("Can't find device %s on node %s" %
3718                                  (dev.iv_name, pri_node))
3719
3720     # Step: check other node consistency
3721     self.proc.LogStep(2, steps_total, "check peer consistency")
3722     for dev in instance.disks:
3723       if not dev.iv_name in self.op.disks:
3724         continue
3725       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3726       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3727         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3728                                  " unsafe to replace the secondary" %
3729                                  pri_node)
3730
3731     # Step: create new storage
3732     self.proc.LogStep(3, steps_total, "allocate new storage")
3733     for dev in instance.disks:
3734       size = dev.size
3735       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3736       # since we *always* want to create this LV, we use the
3737       # _Create...OnPrimary (which forces the creation), even if we
3738       # are talking about the secondary node
3739       for new_lv in dev.children:
3740         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3741                                         _GetInstanceInfoText(instance)):
3742           raise errors.OpExecError("Failed to create new LV named '%s' on"
3743                                    " node '%s'" %
3744                                    (new_lv.logical_id[1], new_node))
3745
3746       iv_names[dev.iv_name] = (dev, dev.children)
3747
3748     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3749     for dev in instance.disks:
3750       size = dev.size
3751       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3752       # create new devices on new_node
3753       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3754                               logical_id=(pri_node, new_node,
3755                                           dev.logical_id[2]),
3756                               children=dev.children)
3757       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3758                                         new_drbd, False,
3759                                       _GetInstanceInfoText(instance)):
3760         raise errors.OpExecError("Failed to create new DRBD on"
3761                                  " node '%s'" % new_node)
3762
3763     for dev in instance.disks:
3764       # we have new devices, shutdown the drbd on the old secondary
3765       info("shutting down drbd for %s on old node" % dev.iv_name)
3766       cfg.SetDiskID(dev, old_node)
3767       if not rpc.call_blockdev_shutdown(old_node, dev):
3768         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3769                 "Please cleanup this device manually as soon as possible")
3770
3771     info("detaching primary drbds from the network (=> standalone)")
3772     done = 0
3773     for dev in instance.disks:
3774       cfg.SetDiskID(dev, pri_node)
3775       # set the physical (unique in bdev terms) id to None, meaning
3776       # detach from network
3777       dev.physical_id = (None,) * len(dev.physical_id)
3778       # and 'find' the device, which will 'fix' it to match the
3779       # standalone state
3780       if rpc.call_blockdev_find(pri_node, dev):
3781         done += 1
3782       else:
3783         warning("Failed to detach drbd %s from network, unusual case" %
3784                 dev.iv_name)
3785
3786     if not done:
3787       # no detaches succeeded (very unlikely)
3788       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3789
3790     # if we managed to detach at least one, we update all the disks of
3791     # the instance to point to the new secondary
3792     info("updating instance configuration")
3793     for dev in instance.disks:
3794       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3795       cfg.SetDiskID(dev, pri_node)
3796     cfg.Update(instance)
3797
3798     # and now perform the drbd attach
3799     info("attaching primary drbds to new secondary (standalone => connected)")
3800     failures = []
3801     for dev in instance.disks:
3802       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3803       # since the attach is smart, it's enough to 'find' the device,
3804       # it will automatically activate the network, if the physical_id
3805       # is correct
3806       cfg.SetDiskID(dev, pri_node)
3807       if not rpc.call_blockdev_find(pri_node, dev):
3808         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3809                 "please do a gnt-instance info to see the status of disks")
3810
3811     # this can fail as the old devices are degraded and _WaitForSync
3812     # does a combined result over all disks, so we don't check its
3813     # return value
3814     self.proc.LogStep(5, steps_total, "sync devices")
3815     _WaitForSync(cfg, instance, self.proc, unlock=True)
3816
3817     # so check manually all the devices
3818     for name, (dev, old_lvs) in iv_names.iteritems():
3819       cfg.SetDiskID(dev, pri_node)
3820       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3821       if is_degr:
3822         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3823
3824     self.proc.LogStep(6, steps_total, "removing old storage")
3825     for name, (dev, old_lvs) in iv_names.iteritems():
3826       info("remove logical volumes for %s" % name)
3827       for lv in old_lvs:
3828         cfg.SetDiskID(lv, old_node)
3829         if not rpc.call_blockdev_remove(old_node, lv):
3830           warning("Can't remove LV on old secondary",
3831                   "Cleanup stale volumes by hand")
3832
3833   def Exec(self, feedback_fn):
3834     """Execute disk replacement.
3835
3836     This dispatches the disk replacement to the appropriate handler.
3837
3838     """
3839     instance = self.instance
3840     if instance.disk_template == constants.DT_REMOTE_RAID1:
3841       fn = self._ExecRR1
3842     elif instance.disk_template == constants.DT_DRBD8:
3843       if self.op.remote_node is None:
3844         fn = self._ExecD8DiskOnly
3845       else:
3846         fn = self._ExecD8Secondary
3847     else:
3848       raise errors.ProgrammerError("Unhandled disk replacement case")
3849     return fn(feedback_fn)
3850
3851
3852 class LUQueryInstanceData(NoHooksLU):
3853   """Query runtime instance data.
3854
3855   """
3856   _OP_REQP = ["instances"]
3857
3858   def CheckPrereq(self):
3859     """Check prerequisites.
3860
3861     This only checks the optional instance list against the existing names.
3862
3863     """
3864     if not isinstance(self.op.instances, list):
3865       raise errors.OpPrereqError("Invalid argument type 'instances'")
3866     if self.op.instances:
3867       self.wanted_instances = []
3868       names = self.op.instances
3869       for name in names:
3870         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3871         if instance is None:
3872           raise errors.OpPrereqError("No such instance name '%s'" % name)
3873       self.wanted_instances.append(instance)
3874     else:
3875       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3876                                in self.cfg.GetInstanceList()]
3877     return
3878
3879
3880   def _ComputeDiskStatus(self, instance, snode, dev):
3881     """Compute block device status.
3882
3883     """
3884     self.cfg.SetDiskID(dev, instance.primary_node)
3885     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3886     if dev.dev_type in constants.LDS_DRBD:
3887       # we change the snode then (otherwise we use the one passed in)
3888       if dev.logical_id[0] == instance.primary_node:
3889         snode = dev.logical_id[1]
3890       else:
3891         snode = dev.logical_id[0]
3892
3893     if snode:
3894       self.cfg.SetDiskID(dev, snode)
3895       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3896     else:
3897       dev_sstatus = None
3898
3899     if dev.children:
3900       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3901                       for child in dev.children]
3902     else:
3903       dev_children = []
3904
3905     data = {
3906       "iv_name": dev.iv_name,
3907       "dev_type": dev.dev_type,
3908       "logical_id": dev.logical_id,
3909       "physical_id": dev.physical_id,
3910       "pstatus": dev_pstatus,
3911       "sstatus": dev_sstatus,
3912       "children": dev_children,
3913       }
3914
3915     return data
3916
3917   def Exec(self, feedback_fn):
3918     """Gather and return data"""
3919     result = {}
3920     for instance in self.wanted_instances:
3921       remote_info = rpc.call_instance_info(instance.primary_node,
3922                                                 instance.name)
3923       if remote_info and "state" in remote_info:
3924         remote_state = "up"
3925       else:
3926         remote_state = "down"
3927       if instance.status == "down":
3928         config_state = "down"
3929       else:
3930         config_state = "up"
3931
3932       disks = [self._ComputeDiskStatus(instance, None, device)
3933                for device in instance.disks]
3934
3935       idict = {
3936         "name": instance.name,
3937         "config_state": config_state,
3938         "run_state": remote_state,
3939         "pnode": instance.primary_node,
3940         "snodes": instance.secondary_nodes,
3941         "os": instance.os,
3942         "memory": instance.memory,
3943         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3944         "disks": disks,
3945         "vcpus": instance.vcpus,
3946         }
3947
3948       result[instance.name] = idict
3949
3950     return result
3951
3952
3953 class LUSetInstanceParms(LogicalUnit):
3954   """Modifies an instances's parameters.
3955
3956   """
3957   HPATH = "instance-modify"
3958   HTYPE = constants.HTYPE_INSTANCE
3959   _OP_REQP = ["instance_name"]
3960
3961   def BuildHooksEnv(self):
3962     """Build hooks env.
3963
3964     This runs on the master, primary and secondaries.
3965
3966     """
3967     args = dict()
3968     if self.mem:
3969       args['memory'] = self.mem
3970     if self.vcpus:
3971       args['vcpus'] = self.vcpus
3972     if self.do_ip or self.do_bridge:
3973       if self.do_ip:
3974         ip = self.ip
3975       else:
3976         ip = self.instance.nics[0].ip
3977       if self.bridge:
3978         bridge = self.bridge
3979       else:
3980         bridge = self.instance.nics[0].bridge
3981       args['nics'] = [(ip, bridge)]
3982     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3983     nl = [self.sstore.GetMasterNode(),
3984           self.instance.primary_node] + list(self.instance.secondary_nodes)
3985     return env, nl, nl
3986
3987   def CheckPrereq(self):
3988     """Check prerequisites.
3989
3990     This only checks the instance list against the existing names.
3991
3992     """
3993     self.mem = getattr(self.op, "mem", None)
3994     self.vcpus = getattr(self.op, "vcpus", None)
3995     self.ip = getattr(self.op, "ip", None)
3996     self.bridge = getattr(self.op, "bridge", None)
3997     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3998       raise errors.OpPrereqError("No changes submitted")
3999     if self.mem is not None:
4000       try:
4001         self.mem = int(self.mem)
4002       except ValueError, err:
4003         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4004     if self.vcpus is not None:
4005       try:
4006         self.vcpus = int(self.vcpus)
4007       except ValueError, err:
4008         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4009     if self.ip is not None:
4010       self.do_ip = True
4011       if self.ip.lower() == "none":
4012         self.ip = None
4013       else:
4014         if not utils.IsValidIP(self.ip):
4015           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4016     else:
4017       self.do_ip = False
4018     self.do_bridge = (self.bridge is not None)
4019
4020     instance = self.cfg.GetInstanceInfo(
4021       self.cfg.ExpandInstanceName(self.op.instance_name))
4022     if instance is None:
4023       raise errors.OpPrereqError("No such instance name '%s'" %
4024                                  self.op.instance_name)
4025     self.op.instance_name = instance.name
4026     self.instance = instance
4027     return
4028
4029   def Exec(self, feedback_fn):
4030     """Modifies an instance.
4031
4032     All parameters take effect only at the next restart of the instance.
4033     """
4034     result = []
4035     instance = self.instance
4036     if self.mem:
4037       instance.memory = self.mem
4038       result.append(("mem", self.mem))
4039     if self.vcpus:
4040       instance.vcpus = self.vcpus
4041       result.append(("vcpus",  self.vcpus))
4042     if self.do_ip:
4043       instance.nics[0].ip = self.ip
4044       result.append(("ip", self.ip))
4045     if self.bridge:
4046       instance.nics[0].bridge = self.bridge
4047       result.append(("bridge", self.bridge))
4048
4049     self.cfg.AddInstance(instance)
4050
4051     return result
4052
4053
4054 class LUQueryExports(NoHooksLU):
4055   """Query the exports list
4056
4057   """
4058   _OP_REQP = []
4059
4060   def CheckPrereq(self):
4061     """Check that the nodelist contains only existing nodes.
4062
4063     """
4064     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4065
4066   def Exec(self, feedback_fn):
4067     """Compute the list of all the exported system images.
4068
4069     Returns:
4070       a dictionary with the structure node->(export-list)
4071       where export-list is a list of the instances exported on
4072       that node.
4073
4074     """
4075     return rpc.call_export_list(self.nodes)
4076
4077
4078 class LUExportInstance(LogicalUnit):
4079   """Export an instance to an image in the cluster.
4080
4081   """
4082   HPATH = "instance-export"
4083   HTYPE = constants.HTYPE_INSTANCE
4084   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4085
4086   def BuildHooksEnv(self):
4087     """Build hooks env.
4088
4089     This will run on the master, primary node and target node.
4090
4091     """
4092     env = {
4093       "EXPORT_NODE": self.op.target_node,
4094       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4095       }
4096     env.update(_BuildInstanceHookEnvByObject(self.instance))
4097     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4098           self.op.target_node]
4099     return env, nl, nl
4100
4101   def CheckPrereq(self):
4102     """Check prerequisites.
4103
4104     This checks that the instance name is a valid one.
4105
4106     """
4107     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4108     self.instance = self.cfg.GetInstanceInfo(instance_name)
4109     if self.instance is None:
4110       raise errors.OpPrereqError("Instance '%s' not found" %
4111                                  self.op.instance_name)
4112
4113     # node verification
4114     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4115     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4116
4117     if self.dst_node is None:
4118       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4119                                  self.op.target_node)
4120     self.op.target_node = self.dst_node.name
4121
4122   def Exec(self, feedback_fn):
4123     """Export an instance to an image in the cluster.
4124
4125     """
4126     instance = self.instance
4127     dst_node = self.dst_node
4128     src_node = instance.primary_node
4129     # shutdown the instance, unless requested not to do so
4130     if self.op.shutdown:
4131       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4132       self.proc.ChainOpCode(op)
4133
4134     vgname = self.cfg.GetVGName()
4135
4136     snap_disks = []
4137
4138     try:
4139       for disk in instance.disks:
4140         if disk.iv_name == "sda":
4141           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4142           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4143
4144           if not new_dev_name:
4145             logger.Error("could not snapshot block device %s on node %s" %
4146                          (disk.logical_id[1], src_node))
4147           else:
4148             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4149                                       logical_id=(vgname, new_dev_name),
4150                                       physical_id=(vgname, new_dev_name),
4151                                       iv_name=disk.iv_name)
4152             snap_disks.append(new_dev)
4153
4154     finally:
4155       if self.op.shutdown:
4156         op = opcodes.OpStartupInstance(instance_name=instance.name,
4157                                        force=False)
4158         self.proc.ChainOpCode(op)
4159
4160     # TODO: check for size
4161
4162     for dev in snap_disks:
4163       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4164                                            instance):
4165         logger.Error("could not export block device %s from node"
4166                      " %s to node %s" %
4167                      (dev.logical_id[1], src_node, dst_node.name))
4168       if not rpc.call_blockdev_remove(src_node, dev):
4169         logger.Error("could not remove snapshot block device %s from"
4170                      " node %s" % (dev.logical_id[1], src_node))
4171
4172     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4173       logger.Error("could not finalize export for instance %s on node %s" %
4174                    (instance.name, dst_node.name))
4175
4176     nodelist = self.cfg.GetNodeList()
4177     nodelist.remove(dst_node.name)
4178
4179     # on one-node clusters nodelist will be empty after the removal
4180     # if we proceed the backup would be removed because OpQueryExports
4181     # substitutes an empty list with the full cluster node list.
4182     if nodelist:
4183       op = opcodes.OpQueryExports(nodes=nodelist)
4184       exportlist = self.proc.ChainOpCode(op)
4185       for node in exportlist:
4186         if instance.name in exportlist[node]:
4187           if not rpc.call_export_remove(node, instance.name):
4188             logger.Error("could not remove older export for instance %s"
4189                          " on node %s" % (instance.name, node))
4190
4191
4192 class TagsLU(NoHooksLU):
4193   """Generic tags LU.
4194
4195   This is an abstract class which is the parent of all the other tags LUs.
4196
4197   """
4198   def CheckPrereq(self):
4199     """Check prerequisites.
4200
4201     """
4202     if self.op.kind == constants.TAG_CLUSTER:
4203       self.target = self.cfg.GetClusterInfo()
4204     elif self.op.kind == constants.TAG_NODE:
4205       name = self.cfg.ExpandNodeName(self.op.name)
4206       if name is None:
4207         raise errors.OpPrereqError("Invalid node name (%s)" %
4208                                    (self.op.name,))
4209       self.op.name = name
4210       self.target = self.cfg.GetNodeInfo(name)
4211     elif self.op.kind == constants.TAG_INSTANCE:
4212       name = self.cfg.ExpandInstanceName(self.op.name)
4213       if name is None:
4214         raise errors.OpPrereqError("Invalid instance name (%s)" %
4215                                    (self.op.name,))
4216       self.op.name = name
4217       self.target = self.cfg.GetInstanceInfo(name)
4218     else:
4219       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4220                                  str(self.op.kind))
4221
4222
4223 class LUGetTags(TagsLU):
4224   """Returns the tags of a given object.
4225
4226   """
4227   _OP_REQP = ["kind", "name"]
4228
4229   def Exec(self, feedback_fn):
4230     """Returns the tag list.
4231
4232     """
4233     return self.target.GetTags()
4234
4235
4236 class LUSearchTags(NoHooksLU):
4237   """Searches the tags for a given pattern.
4238
4239   """
4240   _OP_REQP = ["pattern"]
4241
4242   def CheckPrereq(self):
4243     """Check prerequisites.
4244
4245     This checks the pattern passed for validity by compiling it.
4246
4247     """
4248     try:
4249       self.re = re.compile(self.op.pattern)
4250     except re.error, err:
4251       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4252                                  (self.op.pattern, err))
4253
4254   def Exec(self, feedback_fn):
4255     """Returns the tag list.
4256
4257     """
4258     cfg = self.cfg
4259     tgts = [("/cluster", cfg.GetClusterInfo())]
4260     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4261     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4262     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4263     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4264     results = []
4265     for path, target in tgts:
4266       for tag in target.GetTags():
4267         if self.re.search(tag):
4268           results.append((path, tag))
4269     return results
4270
4271
4272 class LUAddTags(TagsLU):
4273   """Sets a tag on a given object.
4274
4275   """
4276   _OP_REQP = ["kind", "name", "tags"]
4277
4278   def CheckPrereq(self):
4279     """Check prerequisites.
4280
4281     This checks the type and length of the tag name and value.
4282
4283     """
4284     TagsLU.CheckPrereq(self)
4285     for tag in self.op.tags:
4286       objects.TaggableObject.ValidateTag(tag)
4287
4288   def Exec(self, feedback_fn):
4289     """Sets the tag.
4290
4291     """
4292     try:
4293       for tag in self.op.tags:
4294         self.target.AddTag(tag)
4295     except errors.TagError, err:
4296       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4297     try:
4298       self.cfg.Update(self.target)
4299     except errors.ConfigurationError:
4300       raise errors.OpRetryError("There has been a modification to the"
4301                                 " config file and the operation has been"
4302                                 " aborted. Please retry.")
4303
4304
4305 class LUDelTags(TagsLU):
4306   """Delete a list of tags from a given object.
4307
4308   """
4309   _OP_REQP = ["kind", "name", "tags"]
4310
4311   def CheckPrereq(self):
4312     """Check prerequisites.
4313
4314     This checks that we have the given tag.
4315
4316     """
4317     TagsLU.CheckPrereq(self)
4318     for tag in self.op.tags:
4319       objects.TaggableObject.ValidateTag(tag)
4320     del_tags = frozenset(self.op.tags)
4321     cur_tags = self.target.GetTags()
4322     if not del_tags <= cur_tags:
4323       diff_tags = del_tags - cur_tags
4324       diff_names = ["'%s'" % tag for tag in diff_tags]
4325       diff_names.sort()
4326       raise errors.OpPrereqError("Tag(s) %s not found" %
4327                                  (",".join(diff_names)))
4328
4329   def Exec(self, feedback_fn):
4330     """Remove the tag from the object.
4331
4332     """
4333     for tag in self.op.tags:
4334       self.target.RemoveTag(tag)
4335     try:
4336       self.cfg.Update(self.target)
4337     except errors.ConfigurationError:
4338       raise errors.OpRetryError("There has been a modification to the"
4339                                 " config file and the operation has been"
4340                                 " aborted. Please retry.")