Various code style fixes for strings.
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275   else:
276     nic_count = 0
277
278   env["INSTANCE_NIC_COUNT"] = nic_count
279
280   return env
281
282
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284   """Builds instance related env variables for hooks from an object.
285
286   Args:
287     instance: objects.Instance object of instance
288     override: dict of values to override
289   """
290   args = {
291     'name': instance.name,
292     'primary_node': instance.primary_node,
293     'secondary_nodes': instance.secondary_nodes,
294     'os_type': instance.os,
295     'status': instance.os,
296     'memory': instance.memory,
297     'vcpus': instance.vcpus,
298     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299   }
300   if override:
301     args.update(override)
302   return _BuildInstanceHookEnv(**args)
303
304
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306   """Ensure a node has a correct known_hosts entry.
307
308   Args:
309     fullnode - Fully qualified domain name of host. (str)
310     ip       - IPv4 address of host (str)
311     pubkey   - the public key of the cluster
312
313   """
314   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316   else:
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318
319   inthere = False
320
321   save_lines = []
322   add_lines = []
323   removed = False
324
325   for rawline in f:
326     logger.Debug('read %s' % (repr(rawline),))
327
328     parts = rawline.rstrip('\r\n').split()
329
330     # Ignore unwanted lines
331     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332       fields = parts[0].split(',')
333       key = parts[2]
334
335       haveall = True
336       havesome = False
337       for spec in [ ip, fullnode ]:
338         if spec not in fields:
339           haveall = False
340         if spec in fields:
341           havesome = True
342
343       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344       if haveall and key == pubkey:
345         inthere = True
346         save_lines.append(rawline)
347         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348         continue
349
350       if havesome and (not haveall or key != pubkey):
351         removed = True
352         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353         continue
354
355     save_lines.append(rawline)
356
357   if not inthere:
358     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360
361   if removed:
362     save_lines = save_lines + add_lines
363
364     # Write a new file and replace old.
365     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366                                    constants.DATA_DIR)
367     newfile = os.fdopen(fd, 'w')
368     try:
369       newfile.write(''.join(save_lines))
370     finally:
371       newfile.close()
372     logger.Debug("Wrote new known_hosts.")
373     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374
375   elif add_lines:
376     # Simply appending a new line will do the trick.
377     f.seek(0, 2)
378     for add in add_lines:
379       f.write(add)
380
381   f.close()
382
383
384 def _HasValidVG(vglist, vgname):
385   """Checks if the volume group list is valid.
386
387   A non-None return value means there's an error, and the return value
388   is the error message.
389
390   """
391   vgsize = vglist.get(vgname, None)
392   if vgsize is None:
393     return "volume group '%s' missing" % vgname
394   elif vgsize < 20480:
395     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396             (vgname, vgsize))
397   return None
398
399
400 def _InitSSHSetup(node):
401   """Setup the SSH configuration for the cluster.
402
403
404   This generates a dsa keypair for root, adds the pub key to the
405   permitted hosts and adds the hostkey to its own known hosts.
406
407   Args:
408     node: the name of this host as a fqdn
409
410   """
411   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412
413   for name in priv_key, pub_key:
414     if os.path.exists(name):
415       utils.CreateBackup(name)
416     utils.RemoveFile(name)
417
418   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419                          "-f", priv_key,
420                          "-q", "-N", ""])
421   if result.failed:
422     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423                              result.output)
424
425   f = open(pub_key, 'r')
426   try:
427     utils.AddAuthorizedKey(auth_keys, f.read(8192))
428   finally:
429     f.close()
430
431
432 def _InitGanetiServerSetup(ss):
433   """Setup the necessary configuration for the initial node daemon.
434
435   This creates the nodepass file containing the shared password for
436   the cluster and also generates the SSL certificate.
437
438   """
439   # Create pseudo random password
440   randpass = sha.new(os.urandom(64)).hexdigest()
441   # and write it into sstore
442   ss.SetKey(ss.SS_NODED_PASS, randpass)
443
444   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445                          "-days", str(365*5), "-nodes", "-x509",
446                          "-keyout", constants.SSL_CERT_FILE,
447                          "-out", constants.SSL_CERT_FILE, "-batch"])
448   if result.failed:
449     raise errors.OpExecError("could not generate server ssl cert, command"
450                              " %s had exitcode %s and error message %s" %
451                              (result.cmd, result.exit_code, result.output))
452
453   os.chmod(constants.SSL_CERT_FILE, 0400)
454
455   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456
457   if result.failed:
458     raise errors.OpExecError("Could not start the node daemon, command %s"
459                              " had exitcode %s and error %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462
463 def _CheckInstanceBridgesExist(instance):
464   """Check that the brigdes needed by an instance exist.
465
466   """
467   # check bridges existance
468   brlist = [nic.bridge for nic in instance.nics]
469   if not rpc.call_bridges_exist(instance.primary_node, brlist):
470     raise errors.OpPrereqError("one or more target bridges %s does not"
471                                " exist on destination node '%s'" %
472                                (brlist, instance.primary_node))
473
474
475 class LUInitCluster(LogicalUnit):
476   """Initialise the cluster.
477
478   """
479   HPATH = "cluster-init"
480   HTYPE = constants.HTYPE_CLUSTER
481   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482               "def_bridge", "master_netdev"]
483   REQ_CLUSTER = False
484
485   def BuildHooksEnv(self):
486     """Build hooks env.
487
488     Notes: Since we don't require a cluster, we must manually add
489     ourselves in the post-run node list.
490
491     """
492     env = {"OP_TARGET": self.op.cluster_name}
493     return env, [], [self.hostname.name]
494
495   def CheckPrereq(self):
496     """Verify that the passed name is a valid one.
497
498     """
499     if config.ConfigWriter.IsCluster():
500       raise errors.OpPrereqError("Cluster is already initialised")
501
502     self.hostname = hostname = utils.HostInfo()
503
504     if hostname.ip.startswith("127."):
505       raise errors.OpPrereqError("This host's IP resolves to the private"
506                                  " range (%s). Please fix DNS or /etc/hosts." %
507                                  (hostname.ip,))
508
509     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510
511     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512                          constants.DEFAULT_NODED_PORT):
513       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514                                  " to %s,\nbut this ip address does not"
515                                  " belong to this host."
516                                  " Aborting." % hostname.ip)
517
518     secondary_ip = getattr(self.op, "secondary_ip", None)
519     if secondary_ip and not utils.IsValidIP(secondary_ip):
520       raise errors.OpPrereqError("Invalid secondary ip given")
521     if (secondary_ip and
522         secondary_ip != hostname.ip and
523         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524                            constants.DEFAULT_NODED_PORT))):
525       raise errors.OpPrereqError("You gave %s as secondary IP,"
526                                  " but it does not belong to this host." %
527                                  secondary_ip)
528     self.secondary_ip = secondary_ip
529
530     # checks presence of the volume group given
531     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532
533     if vgstatus:
534       raise errors.OpPrereqError("Error: %s" % vgstatus)
535
536     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537                     self.op.mac_prefix):
538       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539                                  self.op.mac_prefix)
540
541     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543                                  self.op.hypervisor_type)
544
545     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546     if result.failed:
547       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548                                  (self.op.master_netdev,
549                                   result.output.strip()))
550
551     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553       raise errors.OpPrereqError("Init.d script '%s' missing or not"
554                                  " executable." % constants.NODE_INITD_SCRIPT)
555
556   def Exec(self, feedback_fn):
557     """Initialize the cluster.
558
559     """
560     clustername = self.clustername
561     hostname = self.hostname
562
563     # set up the simple store
564     self.sstore = ss = ssconf.SimpleStore()
565     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570
571     # set up the inter-node password and certificate
572     _InitGanetiServerSetup(ss)
573
574     # start the master ip
575     rpc.call_node_start_master(hostname.name)
576
577     # set up ssh config and /etc/hosts
578     f = open(constants.SSH_HOST_RSA_PUB, 'r')
579     try:
580       sshline = f.read()
581     finally:
582       f.close()
583     sshkey = sshline.split(" ")[1]
584
585     _AddHostToEtcHosts(hostname.name)
586
587     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588
589     _InitSSHSetup(hostname.name)
590
591     # init of cluster config file
592     self.cfg = cfgw = config.ConfigWriter()
593     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594                     sshkey, self.op.mac_prefix,
595                     self.op.vg_name, self.op.def_bridge)
596
597
598 class LUDestroyCluster(NoHooksLU):
599   """Logical unit for destroying the cluster.
600
601   """
602   _OP_REQP = []
603
604   def CheckPrereq(self):
605     """Check prerequisites.
606
607     This checks whether the cluster is empty.
608
609     Any errors are signalled by raising errors.OpPrereqError.
610
611     """
612     master = self.sstore.GetMasterNode()
613
614     nodelist = self.cfg.GetNodeList()
615     if len(nodelist) != 1 or nodelist[0] != master:
616       raise errors.OpPrereqError("There are still %d node(s) in"
617                                  " this cluster." % (len(nodelist) - 1))
618     instancelist = self.cfg.GetInstanceList()
619     if instancelist:
620       raise errors.OpPrereqError("There are still %d instance(s) in"
621                                  " this cluster." % len(instancelist))
622
623   def Exec(self, feedback_fn):
624     """Destroys the cluster.
625
626     """
627     master = self.sstore.GetMasterNode()
628     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629     utils.CreateBackup(priv_key)
630     utils.CreateBackup(pub_key)
631     rpc.call_node_leave_cluster(master)
632
633
634 class LUVerifyCluster(NoHooksLU):
635   """Verifies the cluster status.
636
637   """
638   _OP_REQP = []
639
640   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641                   remote_version, feedback_fn):
642     """Run multiple tests against a node.
643
644     Test list:
645       - compares ganeti version
646       - checks vg existance and size > 20G
647       - checks config file checksum
648       - checks ssh to other nodes
649
650     Args:
651       node: name of the node to check
652       file_list: required list of files
653       local_cksum: dictionary of local files and their checksums
654
655     """
656     # compares ganeti version
657     local_version = constants.PROTOCOL_VERSION
658     if not remote_version:
659       feedback_fn(" - ERROR: connection to %s failed" % (node))
660       return True
661
662     if local_version != remote_version:
663       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664                       (local_version, node, remote_version))
665       return True
666
667     # checks vg existance and size > 20G
668
669     bad = False
670     if not vglist:
671       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672                       (node,))
673       bad = True
674     else:
675       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676       if vgstatus:
677         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678         bad = True
679
680     # checks config file checksum
681     # checks ssh to any
682
683     if 'filelist' not in node_result:
684       bad = True
685       feedback_fn("  - ERROR: node hasn't returned file checksum data")
686     else:
687       remote_cksum = node_result['filelist']
688       for file_name in file_list:
689         if file_name not in remote_cksum:
690           bad = True
691           feedback_fn("  - ERROR: file '%s' missing" % file_name)
692         elif remote_cksum[file_name] != local_cksum[file_name]:
693           bad = True
694           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695
696     if 'nodelist' not in node_result:
697       bad = True
698       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699     else:
700       if node_result['nodelist']:
701         bad = True
702         for node in node_result['nodelist']:
703           feedback_fn("  - ERROR: communication with node '%s': %s" %
704                           (node, node_result['nodelist'][node]))
705     hyp_result = node_result.get('hypervisor', None)
706     if hyp_result is not None:
707       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708     return bad
709
710   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711     """Verify an instance.
712
713     This function checks to see if the required block devices are
714     available on the instance's node.
715
716     """
717     bad = False
718
719     instancelist = self.cfg.GetInstanceList()
720     if not instance in instancelist:
721       feedback_fn("  - ERROR: instance %s not in instance list %s" %
722                       (instance, instancelist))
723       bad = True
724
725     instanceconfig = self.cfg.GetInstanceInfo(instance)
726     node_current = instanceconfig.primary_node
727
728     node_vol_should = {}
729     instanceconfig.MapLVsByNode(node_vol_should)
730
731     for node in node_vol_should:
732       for volume in node_vol_should[node]:
733         if node not in node_vol_is or volume not in node_vol_is[node]:
734           feedback_fn("  - ERROR: volume %s missing on node %s" %
735                           (volume, node))
736           bad = True
737
738     if not instanceconfig.status == 'down':
739       if not instance in node_instance[node_current]:
740         feedback_fn("  - ERROR: instance %s not running on node %s" %
741                         (instance, node_current))
742         bad = True
743
744     for node in node_instance:
745       if (not node == node_current):
746         if instance in node_instance[node]:
747           feedback_fn("  - ERROR: instance %s should not run on node %s" %
748                           (instance, node))
749           bad = True
750
751     return bad
752
753   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754     """Verify if there are any unknown volumes in the cluster.
755
756     The .os, .swap and backup volumes are ignored. All other volumes are
757     reported as unknown.
758
759     """
760     bad = False
761
762     for node in node_vol_is:
763       for volume in node_vol_is[node]:
764         if node not in node_vol_should or volume not in node_vol_should[node]:
765           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766                       (volume, node))
767           bad = True
768     return bad
769
770   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771     """Verify the list of running instances.
772
773     This checks what instances are running but unknown to the cluster.
774
775     """
776     bad = False
777     for node in node_instance:
778       for runninginstance in node_instance[node]:
779         if runninginstance not in instancelist:
780           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781                           (runninginstance, node))
782           bad = True
783     return bad
784
785   def CheckPrereq(self):
786     """Check prerequisites.
787
788     This has no prerequisites.
789
790     """
791     pass
792
793   def Exec(self, feedback_fn):
794     """Verify integrity of cluster, performing various test on nodes.
795
796     """
797     bad = False
798     feedback_fn("* Verifying global settings")
799     for msg in self.cfg.VerifyConfig():
800       feedback_fn("  - ERROR: %s" % msg)
801
802     vg_name = self.cfg.GetVGName()
803     nodelist = utils.NiceSort(self.cfg.GetNodeList())
804     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
805     node_volume = {}
806     node_instance = {}
807
808     # FIXME: verify OS list
809     # do local checksums
810     file_names = list(self.sstore.GetFileList())
811     file_names.append(constants.SSL_CERT_FILE)
812     file_names.append(constants.CLUSTER_CONF_FILE)
813     local_checksums = utils.FingerprintFiles(file_names)
814
815     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
816     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
817     all_instanceinfo = rpc.call_instance_list(nodelist)
818     all_vglist = rpc.call_vg_list(nodelist)
819     node_verify_param = {
820       'filelist': file_names,
821       'nodelist': nodelist,
822       'hypervisor': None,
823       }
824     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
825     all_rversion = rpc.call_version(nodelist)
826
827     for node in nodelist:
828       feedback_fn("* Verifying node %s" % node)
829       result = self._VerifyNode(node, file_names, local_checksums,
830                                 all_vglist[node], all_nvinfo[node],
831                                 all_rversion[node], feedback_fn)
832       bad = bad or result
833
834       # node_volume
835       volumeinfo = all_volumeinfo[node]
836
837       if type(volumeinfo) != dict:
838         feedback_fn("  - ERROR: connection to %s failed" % (node,))
839         bad = True
840         continue
841
842       node_volume[node] = volumeinfo
843
844       # node_instance
845       nodeinstance = all_instanceinfo[node]
846       if type(nodeinstance) != list:
847         feedback_fn("  - ERROR: connection to %s failed" % (node,))
848         bad = True
849         continue
850
851       node_instance[node] = nodeinstance
852
853     node_vol_should = {}
854
855     for instance in instancelist:
856       feedback_fn("* Verifying instance %s" % instance)
857       result =  self._VerifyInstance(instance, node_volume, node_instance,
858                                      feedback_fn)
859       bad = bad or result
860
861       inst_config = self.cfg.GetInstanceInfo(instance)
862
863       inst_config.MapLVsByNode(node_vol_should)
864
865     feedback_fn("* Verifying orphan volumes")
866     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867                                        feedback_fn)
868     bad = bad or result
869
870     feedback_fn("* Verifying remaining instances")
871     result = self._VerifyOrphanInstances(instancelist, node_instance,
872                                          feedback_fn)
873     bad = bad or result
874
875     return int(bad)
876
877
878 class LURenameCluster(LogicalUnit):
879   """Rename the cluster.
880
881   """
882   HPATH = "cluster-rename"
883   HTYPE = constants.HTYPE_CLUSTER
884   _OP_REQP = ["name"]
885
886   def BuildHooksEnv(self):
887     """Build hooks env.
888
889     """
890     env = {
891       "OP_TARGET": self.op.sstore.GetClusterName(),
892       "NEW_NAME": self.op.name,
893       }
894     mn = self.sstore.GetMasterNode()
895     return env, [mn], [mn]
896
897   def CheckPrereq(self):
898     """Verify that the passed name is a valid one.
899
900     """
901     hostname = utils.HostInfo(self.op.name)
902
903     new_name = hostname.name
904     self.ip = new_ip = hostname.ip
905     old_name = self.sstore.GetClusterName()
906     old_ip = self.sstore.GetMasterIP()
907     if new_name == old_name and new_ip == old_ip:
908       raise errors.OpPrereqError("Neither the name nor the IP address of the"
909                                  " cluster has changed")
910     if new_ip != old_ip:
911       result = utils.RunCmd(["fping", "-q", new_ip])
912       if not result.failed:
913         raise errors.OpPrereqError("The given cluster IP address (%s) is"
914                                    " reachable on the network. Aborting." %
915                                    new_ip)
916
917     self.op.name = new_name
918
919   def Exec(self, feedback_fn):
920     """Rename the cluster.
921
922     """
923     clustername = self.op.name
924     ip = self.ip
925     ss = self.sstore
926
927     # shutdown the master IP
928     master = ss.GetMasterNode()
929     if not rpc.call_node_stop_master(master):
930       raise errors.OpExecError("Could not disable the master role")
931
932     try:
933       # modify the sstore
934       ss.SetKey(ss.SS_MASTER_IP, ip)
935       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
936
937       # Distribute updated ss config to all nodes
938       myself = self.cfg.GetNodeInfo(master)
939       dist_nodes = self.cfg.GetNodeList()
940       if myself.name in dist_nodes:
941         dist_nodes.remove(myself.name)
942
943       logger.Debug("Copying updated ssconf data to all nodes")
944       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
945         fname = ss.KeyToFilename(keyname)
946         result = rpc.call_upload_file(dist_nodes, fname)
947         for to_node in dist_nodes:
948           if not result[to_node]:
949             logger.Error("copy of file %s to node %s failed" %
950                          (fname, to_node))
951     finally:
952       if not rpc.call_node_start_master(master):
953         logger.Error("Could not re-enable the master role on the master,"
954                      " please restart manually.")
955
956
957 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
958   """Sleep and poll for an instance's disk to sync.
959
960   """
961   if not instance.disks:
962     return True
963
964   if not oneshot:
965     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
966
967   node = instance.primary_node
968
969   for dev in instance.disks:
970     cfgw.SetDiskID(dev, node)
971
972   retries = 0
973   while True:
974     max_time = 0
975     done = True
976     cumul_degraded = False
977     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
978     if not rstats:
979       proc.LogWarning("Can't get any data from node %s" % node)
980       retries += 1
981       if retries >= 10:
982         raise errors.RemoteError("Can't contact node %s for mirror data,"
983                                  " aborting." % node)
984       time.sleep(6)
985       continue
986     retries = 0
987     for i in range(len(rstats)):
988       mstat = rstats[i]
989       if mstat is None:
990         proc.LogWarning("Can't compute data for node %s/%s" %
991                         (node, instance.disks[i].iv_name))
992         continue
993       # we ignore the ldisk parameter
994       perc_done, est_time, is_degraded, _ = mstat
995       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
996       if perc_done is not None:
997         done = False
998         if est_time is not None:
999           rem_time = "%d estimated seconds remaining" % est_time
1000           max_time = est_time
1001         else:
1002           rem_time = "no time estimate"
1003         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1004                      (instance.disks[i].iv_name, perc_done, rem_time))
1005     if done or oneshot:
1006       break
1007
1008     if unlock:
1009       utils.Unlock('cmd')
1010     try:
1011       time.sleep(min(60, max_time))
1012     finally:
1013       if unlock:
1014         utils.Lock('cmd')
1015
1016   if done:
1017     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1018   return not cumul_degraded
1019
1020
1021 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1022   """Check that mirrors are not degraded.
1023
1024   The ldisk parameter, if True, will change the test from the
1025   is_degraded attribute (which represents overall non-ok status for
1026   the device(s)) to the ldisk (representing the local storage status).
1027
1028   """
1029   cfgw.SetDiskID(dev, node)
1030   if ldisk:
1031     idx = 6
1032   else:
1033     idx = 5
1034
1035   result = True
1036   if on_primary or dev.AssembleOnSecondary():
1037     rstats = rpc.call_blockdev_find(node, dev)
1038     if not rstats:
1039       logger.ToStderr("Can't get any data from node %s" % node)
1040       result = False
1041     else:
1042       result = result and (not rstats[idx])
1043   if dev.children:
1044     for child in dev.children:
1045       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1046
1047   return result
1048
1049
1050 class LUDiagnoseOS(NoHooksLU):
1051   """Logical unit for OS diagnose/query.
1052
1053   """
1054   _OP_REQP = []
1055
1056   def CheckPrereq(self):
1057     """Check prerequisites.
1058
1059     This always succeeds, since this is a pure query LU.
1060
1061     """
1062     return
1063
1064   def Exec(self, feedback_fn):
1065     """Compute the list of OSes.
1066
1067     """
1068     node_list = self.cfg.GetNodeList()
1069     node_data = rpc.call_os_diagnose(node_list)
1070     if node_data == False:
1071       raise errors.OpExecError("Can't gather the list of OSes")
1072     return node_data
1073
1074
1075 class LURemoveNode(LogicalUnit):
1076   """Logical unit for removing a node.
1077
1078   """
1079   HPATH = "node-remove"
1080   HTYPE = constants.HTYPE_NODE
1081   _OP_REQP = ["node_name"]
1082
1083   def BuildHooksEnv(self):
1084     """Build hooks env.
1085
1086     This doesn't run on the target node in the pre phase as a failed
1087     node would not allows itself to run.
1088
1089     """
1090     env = {
1091       "OP_TARGET": self.op.node_name,
1092       "NODE_NAME": self.op.node_name,
1093       }
1094     all_nodes = self.cfg.GetNodeList()
1095     all_nodes.remove(self.op.node_name)
1096     return env, all_nodes, all_nodes
1097
1098   def CheckPrereq(self):
1099     """Check prerequisites.
1100
1101     This checks:
1102      - the node exists in the configuration
1103      - it does not have primary or secondary instances
1104      - it's not the master
1105
1106     Any errors are signalled by raising errors.OpPrereqError.
1107
1108     """
1109     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1110     if node is None:
1111       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1112
1113     instance_list = self.cfg.GetInstanceList()
1114
1115     masternode = self.sstore.GetMasterNode()
1116     if node.name == masternode:
1117       raise errors.OpPrereqError("Node is the master node,"
1118                                  " you need to failover first.")
1119
1120     for instance_name in instance_list:
1121       instance = self.cfg.GetInstanceInfo(instance_name)
1122       if node.name == instance.primary_node:
1123         raise errors.OpPrereqError("Instance %s still running on the node,"
1124                                    " please remove first." % instance_name)
1125       if node.name in instance.secondary_nodes:
1126         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1127                                    " please remove first." % instance_name)
1128     self.op.node_name = node.name
1129     self.node = node
1130
1131   def Exec(self, feedback_fn):
1132     """Removes the node from the cluster.
1133
1134     """
1135     node = self.node
1136     logger.Info("stopping the node daemon and removing configs from node %s" %
1137                 node.name)
1138
1139     rpc.call_node_leave_cluster(node.name)
1140
1141     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1142
1143     logger.Info("Removing node %s from config" % node.name)
1144
1145     self.cfg.RemoveNode(node.name)
1146
1147     _RemoveHostFromEtcHosts(node.name)
1148
1149
1150 class LUQueryNodes(NoHooksLU):
1151   """Logical unit for querying nodes.
1152
1153   """
1154   _OP_REQP = ["output_fields", "names"]
1155
1156   def CheckPrereq(self):
1157     """Check prerequisites.
1158
1159     This checks that the fields required are valid output fields.
1160
1161     """
1162     self.dynamic_fields = frozenset(["dtotal", "dfree",
1163                                      "mtotal", "mnode", "mfree",
1164                                      "bootid"])
1165
1166     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1167                                "pinst_list", "sinst_list",
1168                                "pip", "sip"],
1169                        dynamic=self.dynamic_fields,
1170                        selected=self.op.output_fields)
1171
1172     self.wanted = _GetWantedNodes(self, self.op.names)
1173
1174   def Exec(self, feedback_fn):
1175     """Computes the list of nodes and their attributes.
1176
1177     """
1178     nodenames = self.wanted
1179     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1180
1181     # begin data gathering
1182
1183     if self.dynamic_fields.intersection(self.op.output_fields):
1184       live_data = {}
1185       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1186       for name in nodenames:
1187         nodeinfo = node_data.get(name, None)
1188         if nodeinfo:
1189           live_data[name] = {
1190             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1191             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1192             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1193             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1194             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1195             "bootid": nodeinfo['bootid'],
1196             }
1197         else:
1198           live_data[name] = {}
1199     else:
1200       live_data = dict.fromkeys(nodenames, {})
1201
1202     node_to_primary = dict([(name, set()) for name in nodenames])
1203     node_to_secondary = dict([(name, set()) for name in nodenames])
1204
1205     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1206                              "sinst_cnt", "sinst_list"))
1207     if inst_fields & frozenset(self.op.output_fields):
1208       instancelist = self.cfg.GetInstanceList()
1209
1210       for instance_name in instancelist:
1211         inst = self.cfg.GetInstanceInfo(instance_name)
1212         if inst.primary_node in node_to_primary:
1213           node_to_primary[inst.primary_node].add(inst.name)
1214         for secnode in inst.secondary_nodes:
1215           if secnode in node_to_secondary:
1216             node_to_secondary[secnode].add(inst.name)
1217
1218     # end data gathering
1219
1220     output = []
1221     for node in nodelist:
1222       node_output = []
1223       for field in self.op.output_fields:
1224         if field == "name":
1225           val = node.name
1226         elif field == "pinst_list":
1227           val = list(node_to_primary[node.name])
1228         elif field == "sinst_list":
1229           val = list(node_to_secondary[node.name])
1230         elif field == "pinst_cnt":
1231           val = len(node_to_primary[node.name])
1232         elif field == "sinst_cnt":
1233           val = len(node_to_secondary[node.name])
1234         elif field == "pip":
1235           val = node.primary_ip
1236         elif field == "sip":
1237           val = node.secondary_ip
1238         elif field in self.dynamic_fields:
1239           val = live_data[node.name].get(field, None)
1240         else:
1241           raise errors.ParameterError(field)
1242         node_output.append(val)
1243       output.append(node_output)
1244
1245     return output
1246
1247
1248 class LUQueryNodeVolumes(NoHooksLU):
1249   """Logical unit for getting volumes on node(s).
1250
1251   """
1252   _OP_REQP = ["nodes", "output_fields"]
1253
1254   def CheckPrereq(self):
1255     """Check prerequisites.
1256
1257     This checks that the fields required are valid output fields.
1258
1259     """
1260     self.nodes = _GetWantedNodes(self, self.op.nodes)
1261
1262     _CheckOutputFields(static=["node"],
1263                        dynamic=["phys", "vg", "name", "size", "instance"],
1264                        selected=self.op.output_fields)
1265
1266
1267   def Exec(self, feedback_fn):
1268     """Computes the list of nodes and their attributes.
1269
1270     """
1271     nodenames = self.nodes
1272     volumes = rpc.call_node_volumes(nodenames)
1273
1274     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1275              in self.cfg.GetInstanceList()]
1276
1277     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1278
1279     output = []
1280     for node in nodenames:
1281       if node not in volumes or not volumes[node]:
1282         continue
1283
1284       node_vols = volumes[node][:]
1285       node_vols.sort(key=lambda vol: vol['dev'])
1286
1287       for vol in node_vols:
1288         node_output = []
1289         for field in self.op.output_fields:
1290           if field == "node":
1291             val = node
1292           elif field == "phys":
1293             val = vol['dev']
1294           elif field == "vg":
1295             val = vol['vg']
1296           elif field == "name":
1297             val = vol['name']
1298           elif field == "size":
1299             val = int(float(vol['size']))
1300           elif field == "instance":
1301             for inst in ilist:
1302               if node not in lv_by_node[inst]:
1303                 continue
1304               if vol['name'] in lv_by_node[inst][node]:
1305                 val = inst.name
1306                 break
1307             else:
1308               val = '-'
1309           else:
1310             raise errors.ParameterError(field)
1311           node_output.append(str(val))
1312
1313         output.append(node_output)
1314
1315     return output
1316
1317
1318 class LUAddNode(LogicalUnit):
1319   """Logical unit for adding node to the cluster.
1320
1321   """
1322   HPATH = "node-add"
1323   HTYPE = constants.HTYPE_NODE
1324   _OP_REQP = ["node_name"]
1325
1326   def BuildHooksEnv(self):
1327     """Build hooks env.
1328
1329     This will run on all nodes before, and on all nodes + the new node after.
1330
1331     """
1332     env = {
1333       "OP_TARGET": self.op.node_name,
1334       "NODE_NAME": self.op.node_name,
1335       "NODE_PIP": self.op.primary_ip,
1336       "NODE_SIP": self.op.secondary_ip,
1337       }
1338     nodes_0 = self.cfg.GetNodeList()
1339     nodes_1 = nodes_0 + [self.op.node_name, ]
1340     return env, nodes_0, nodes_1
1341
1342   def CheckPrereq(self):
1343     """Check prerequisites.
1344
1345     This checks:
1346      - the new node is not already in the config
1347      - it is resolvable
1348      - its parameters (single/dual homed) matches the cluster
1349
1350     Any errors are signalled by raising errors.OpPrereqError.
1351
1352     """
1353     node_name = self.op.node_name
1354     cfg = self.cfg
1355
1356     dns_data = utils.HostInfo(node_name)
1357
1358     node = dns_data.name
1359     primary_ip = self.op.primary_ip = dns_data.ip
1360     secondary_ip = getattr(self.op, "secondary_ip", None)
1361     if secondary_ip is None:
1362       secondary_ip = primary_ip
1363     if not utils.IsValidIP(secondary_ip):
1364       raise errors.OpPrereqError("Invalid secondary IP given")
1365     self.op.secondary_ip = secondary_ip
1366     node_list = cfg.GetNodeList()
1367     if node in node_list:
1368       raise errors.OpPrereqError("Node %s is already in the configuration"
1369                                  % node)
1370
1371     for existing_node_name in node_list:
1372       existing_node = cfg.GetNodeInfo(existing_node_name)
1373       if (existing_node.primary_ip == primary_ip or
1374           existing_node.secondary_ip == primary_ip or
1375           existing_node.primary_ip == secondary_ip or
1376           existing_node.secondary_ip == secondary_ip):
1377         raise errors.OpPrereqError("New node ip address(es) conflict with"
1378                                    " existing node %s" % existing_node.name)
1379
1380     # check that the type of the node (single versus dual homed) is the
1381     # same as for the master
1382     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1383     master_singlehomed = myself.secondary_ip == myself.primary_ip
1384     newbie_singlehomed = secondary_ip == primary_ip
1385     if master_singlehomed != newbie_singlehomed:
1386       if master_singlehomed:
1387         raise errors.OpPrereqError("The master has no private ip but the"
1388                                    " new node has one")
1389       else:
1390         raise errors.OpPrereqError("The master has a private ip but the"
1391                                    " new node doesn't have one")
1392
1393     # checks reachablity
1394     if not utils.TcpPing(utils.HostInfo().name,
1395                          primary_ip,
1396                          constants.DEFAULT_NODED_PORT):
1397       raise errors.OpPrereqError("Node not reachable by ping")
1398
1399     if not newbie_singlehomed:
1400       # check reachability from my secondary ip to newbie's secondary ip
1401       if not utils.TcpPing(myself.secondary_ip,
1402                            secondary_ip,
1403                            constants.DEFAULT_NODED_PORT):
1404         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1405                                    " based ping to noded port")
1406
1407     self.new_node = objects.Node(name=node,
1408                                  primary_ip=primary_ip,
1409                                  secondary_ip=secondary_ip)
1410
1411   def Exec(self, feedback_fn):
1412     """Adds the new node to the cluster.
1413
1414     """
1415     new_node = self.new_node
1416     node = new_node.name
1417
1418     # set up inter-node password and certificate and restarts the node daemon
1419     gntpass = self.sstore.GetNodeDaemonPassword()
1420     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1421       raise errors.OpExecError("ganeti password corruption detected")
1422     f = open(constants.SSL_CERT_FILE)
1423     try:
1424       gntpem = f.read(8192)
1425     finally:
1426       f.close()
1427     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1428     # so we use this to detect an invalid certificate; as long as the
1429     # cert doesn't contain this, the here-document will be correctly
1430     # parsed by the shell sequence below
1431     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1432       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1433     if not gntpem.endswith("\n"):
1434       raise errors.OpExecError("PEM must end with newline")
1435     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1436
1437     # and then connect with ssh to set password and start ganeti-noded
1438     # note that all the below variables are sanitized at this point,
1439     # either by being constants or by the checks above
1440     ss = self.sstore
1441     mycommand = ("umask 077 && "
1442                  "echo '%s' > '%s' && "
1443                  "cat > '%s' << '!EOF.' && \n"
1444                  "%s!EOF.\n%s restart" %
1445                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1446                   constants.SSL_CERT_FILE, gntpem,
1447                   constants.NODE_INITD_SCRIPT))
1448
1449     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1450     if result.failed:
1451       raise errors.OpExecError("Remote command on node %s, error: %s,"
1452                                " output: %s" %
1453                                (node, result.fail_reason, result.output))
1454
1455     # check connectivity
1456     time.sleep(4)
1457
1458     result = rpc.call_version([node])[node]
1459     if result:
1460       if constants.PROTOCOL_VERSION == result:
1461         logger.Info("communication to node %s fine, sw version %s match" %
1462                     (node, result))
1463       else:
1464         raise errors.OpExecError("Version mismatch master version %s,"
1465                                  " node version %s" %
1466                                  (constants.PROTOCOL_VERSION, result))
1467     else:
1468       raise errors.OpExecError("Cannot get version from the new node")
1469
1470     # setup ssh on node
1471     logger.Info("copy ssh key to node %s" % node)
1472     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1473     keyarray = []
1474     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1475                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1476                 priv_key, pub_key]
1477
1478     for i in keyfiles:
1479       f = open(i, 'r')
1480       try:
1481         keyarray.append(f.read())
1482       finally:
1483         f.close()
1484
1485     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1486                                keyarray[3], keyarray[4], keyarray[5])
1487
1488     if not result:
1489       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1490
1491     # Add node to our /etc/hosts, and add key to known_hosts
1492     _AddHostToEtcHosts(new_node.name)
1493
1494     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1495                       self.cfg.GetHostKey())
1496
1497     if new_node.secondary_ip != new_node.primary_ip:
1498       if not rpc.call_node_tcp_ping(new_node.name,
1499                                     constants.LOCALHOST_IP_ADDRESS,
1500                                     new_node.secondary_ip,
1501                                     constants.DEFAULT_NODED_PORT,
1502                                     10, False):
1503         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1504                                  " you gave (%s). Please fix and re-run this"
1505                                  " command." % new_node.secondary_ip)
1506
1507     success, msg = ssh.VerifyNodeHostname(node)
1508     if not success:
1509       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1510                                " than the one the resolver gives: %s."
1511                                " Please fix and re-run this command." %
1512                                (node, msg))
1513
1514     # Distribute updated /etc/hosts and known_hosts to all nodes,
1515     # including the node just added
1516     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1517     dist_nodes = self.cfg.GetNodeList() + [node]
1518     if myself.name in dist_nodes:
1519       dist_nodes.remove(myself.name)
1520
1521     logger.Debug("Copying hosts and known_hosts to all nodes")
1522     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1523       result = rpc.call_upload_file(dist_nodes, fname)
1524       for to_node in dist_nodes:
1525         if not result[to_node]:
1526           logger.Error("copy of file %s to node %s failed" %
1527                        (fname, to_node))
1528
1529     to_copy = ss.GetFileList()
1530     for fname in to_copy:
1531       if not ssh.CopyFileToNode(node, fname):
1532         logger.Error("could not copy file %s to node %s" % (fname, node))
1533
1534     logger.Info("adding node %s to cluster.conf" % node)
1535     self.cfg.AddNode(new_node)
1536
1537
1538 class LUMasterFailover(LogicalUnit):
1539   """Failover the master node to the current node.
1540
1541   This is a special LU in that it must run on a non-master node.
1542
1543   """
1544   HPATH = "master-failover"
1545   HTYPE = constants.HTYPE_CLUSTER
1546   REQ_MASTER = False
1547   _OP_REQP = []
1548
1549   def BuildHooksEnv(self):
1550     """Build hooks env.
1551
1552     This will run on the new master only in the pre phase, and on all
1553     the nodes in the post phase.
1554
1555     """
1556     env = {
1557       "OP_TARGET": self.new_master,
1558       "NEW_MASTER": self.new_master,
1559       "OLD_MASTER": self.old_master,
1560       }
1561     return env, [self.new_master], self.cfg.GetNodeList()
1562
1563   def CheckPrereq(self):
1564     """Check prerequisites.
1565
1566     This checks that we are not already the master.
1567
1568     """
1569     self.new_master = utils.HostInfo().name
1570     self.old_master = self.sstore.GetMasterNode()
1571
1572     if self.old_master == self.new_master:
1573       raise errors.OpPrereqError("This commands must be run on the node"
1574                                  " where you want the new master to be."
1575                                  " %s is already the master" %
1576                                  self.old_master)
1577
1578   def Exec(self, feedback_fn):
1579     """Failover the master node.
1580
1581     This command, when run on a non-master node, will cause the current
1582     master to cease being master, and the non-master to become new
1583     master.
1584
1585     """
1586     #TODO: do not rely on gethostname returning the FQDN
1587     logger.Info("setting master to %s, old master: %s" %
1588                 (self.new_master, self.old_master))
1589
1590     if not rpc.call_node_stop_master(self.old_master):
1591       logger.Error("could disable the master role on the old master"
1592                    " %s, please disable manually" % self.old_master)
1593
1594     ss = self.sstore
1595     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1596     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1597                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1598       logger.Error("could not distribute the new simple store master file"
1599                    " to the other nodes, please check.")
1600
1601     if not rpc.call_node_start_master(self.new_master):
1602       logger.Error("could not start the master role on the new master"
1603                    " %s, please check" % self.new_master)
1604       feedback_fn("Error in activating the master IP on the new master,"
1605                   " please fix manually.")
1606
1607
1608
1609 class LUQueryClusterInfo(NoHooksLU):
1610   """Query cluster configuration.
1611
1612   """
1613   _OP_REQP = []
1614   REQ_MASTER = False
1615
1616   def CheckPrereq(self):
1617     """No prerequsites needed for this LU.
1618
1619     """
1620     pass
1621
1622   def Exec(self, feedback_fn):
1623     """Return cluster config.
1624
1625     """
1626     result = {
1627       "name": self.sstore.GetClusterName(),
1628       "software_version": constants.RELEASE_VERSION,
1629       "protocol_version": constants.PROTOCOL_VERSION,
1630       "config_version": constants.CONFIG_VERSION,
1631       "os_api_version": constants.OS_API_VERSION,
1632       "export_version": constants.EXPORT_VERSION,
1633       "master": self.sstore.GetMasterNode(),
1634       "architecture": (platform.architecture()[0], platform.machine()),
1635       }
1636
1637     return result
1638
1639
1640 class LUClusterCopyFile(NoHooksLU):
1641   """Copy file to cluster.
1642
1643   """
1644   _OP_REQP = ["nodes", "filename"]
1645
1646   def CheckPrereq(self):
1647     """Check prerequisites.
1648
1649     It should check that the named file exists and that the given list
1650     of nodes is valid.
1651
1652     """
1653     if not os.path.exists(self.op.filename):
1654       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1655
1656     self.nodes = _GetWantedNodes(self, self.op.nodes)
1657
1658   def Exec(self, feedback_fn):
1659     """Copy a file from master to some nodes.
1660
1661     Args:
1662       opts - class with options as members
1663       args - list containing a single element, the file name
1664     Opts used:
1665       nodes - list containing the name of target nodes; if empty, all nodes
1666
1667     """
1668     filename = self.op.filename
1669
1670     myname = utils.HostInfo().name
1671
1672     for node in self.nodes:
1673       if node == myname:
1674         continue
1675       if not ssh.CopyFileToNode(node, filename):
1676         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1677
1678
1679 class LUDumpClusterConfig(NoHooksLU):
1680   """Return a text-representation of the cluster-config.
1681
1682   """
1683   _OP_REQP = []
1684
1685   def CheckPrereq(self):
1686     """No prerequisites.
1687
1688     """
1689     pass
1690
1691   def Exec(self, feedback_fn):
1692     """Dump a representation of the cluster config to the standard output.
1693
1694     """
1695     return self.cfg.DumpConfig()
1696
1697
1698 class LURunClusterCommand(NoHooksLU):
1699   """Run a command on some nodes.
1700
1701   """
1702   _OP_REQP = ["command", "nodes"]
1703
1704   def CheckPrereq(self):
1705     """Check prerequisites.
1706
1707     It checks that the given list of nodes is valid.
1708
1709     """
1710     self.nodes = _GetWantedNodes(self, self.op.nodes)
1711
1712   def Exec(self, feedback_fn):
1713     """Run a command on some nodes.
1714
1715     """
1716     data = []
1717     for node in self.nodes:
1718       result = ssh.SSHCall(node, "root", self.op.command)
1719       data.append((node, result.output, result.exit_code))
1720
1721     return data
1722
1723
1724 class LUActivateInstanceDisks(NoHooksLU):
1725   """Bring up an instance's disks.
1726
1727   """
1728   _OP_REQP = ["instance_name"]
1729
1730   def CheckPrereq(self):
1731     """Check prerequisites.
1732
1733     This checks that the instance is in the cluster.
1734
1735     """
1736     instance = self.cfg.GetInstanceInfo(
1737       self.cfg.ExpandInstanceName(self.op.instance_name))
1738     if instance is None:
1739       raise errors.OpPrereqError("Instance '%s' not known" %
1740                                  self.op.instance_name)
1741     self.instance = instance
1742
1743
1744   def Exec(self, feedback_fn):
1745     """Activate the disks.
1746
1747     """
1748     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1749     if not disks_ok:
1750       raise errors.OpExecError("Cannot activate block devices")
1751
1752     return disks_info
1753
1754
1755 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1756   """Prepare the block devices for an instance.
1757
1758   This sets up the block devices on all nodes.
1759
1760   Args:
1761     instance: a ganeti.objects.Instance object
1762     ignore_secondaries: if true, errors on secondary nodes won't result
1763                         in an error return from the function
1764
1765   Returns:
1766     false if the operation failed
1767     list of (host, instance_visible_name, node_visible_name) if the operation
1768          suceeded with the mapping from node devices to instance devices
1769   """
1770   device_info = []
1771   disks_ok = True
1772   for inst_disk in instance.disks:
1773     master_result = None
1774     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1775       cfg.SetDiskID(node_disk, node)
1776       is_primary = node == instance.primary_node
1777       result = rpc.call_blockdev_assemble(node, node_disk,
1778                                           instance.name, is_primary)
1779       if not result:
1780         logger.Error("could not prepare block device %s on node %s"
1781                      " (is_primary=%s)" %
1782                      (inst_disk.iv_name, node, is_primary))
1783         if is_primary or not ignore_secondaries:
1784           disks_ok = False
1785       if is_primary:
1786         master_result = result
1787     device_info.append((instance.primary_node, inst_disk.iv_name,
1788                         master_result))
1789
1790   # leave the disks configured for the primary node
1791   # this is a workaround that would be fixed better by
1792   # improving the logical/physical id handling
1793   for disk in instance.disks:
1794     cfg.SetDiskID(disk, instance.primary_node)
1795
1796   return disks_ok, device_info
1797
1798
1799 def _StartInstanceDisks(cfg, instance, force):
1800   """Start the disks of an instance.
1801
1802   """
1803   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1804                                            ignore_secondaries=force)
1805   if not disks_ok:
1806     _ShutdownInstanceDisks(instance, cfg)
1807     if force is not None and not force:
1808       logger.Error("If the message above refers to a secondary node,"
1809                    " you can retry the operation using '--force'.")
1810     raise errors.OpExecError("Disk consistency error")
1811
1812
1813 class LUDeactivateInstanceDisks(NoHooksLU):
1814   """Shutdown an instance's disks.
1815
1816   """
1817   _OP_REQP = ["instance_name"]
1818
1819   def CheckPrereq(self):
1820     """Check prerequisites.
1821
1822     This checks that the instance is in the cluster.
1823
1824     """
1825     instance = self.cfg.GetInstanceInfo(
1826       self.cfg.ExpandInstanceName(self.op.instance_name))
1827     if instance is None:
1828       raise errors.OpPrereqError("Instance '%s' not known" %
1829                                  self.op.instance_name)
1830     self.instance = instance
1831
1832   def Exec(self, feedback_fn):
1833     """Deactivate the disks
1834
1835     """
1836     instance = self.instance
1837     ins_l = rpc.call_instance_list([instance.primary_node])
1838     ins_l = ins_l[instance.primary_node]
1839     if not type(ins_l) is list:
1840       raise errors.OpExecError("Can't contact node '%s'" %
1841                                instance.primary_node)
1842
1843     if self.instance.name in ins_l:
1844       raise errors.OpExecError("Instance is running, can't shutdown"
1845                                " block devices.")
1846
1847     _ShutdownInstanceDisks(instance, self.cfg)
1848
1849
1850 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1851   """Shutdown block devices of an instance.
1852
1853   This does the shutdown on all nodes of the instance.
1854
1855   If the ignore_primary is false, errors on the primary node are
1856   ignored.
1857
1858   """
1859   result = True
1860   for disk in instance.disks:
1861     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1862       cfg.SetDiskID(top_disk, node)
1863       if not rpc.call_blockdev_shutdown(node, top_disk):
1864         logger.Error("could not shutdown block device %s on node %s" %
1865                      (disk.iv_name, node))
1866         if not ignore_primary or node != instance.primary_node:
1867           result = False
1868   return result
1869
1870
1871 class LUStartupInstance(LogicalUnit):
1872   """Starts an instance.
1873
1874   """
1875   HPATH = "instance-start"
1876   HTYPE = constants.HTYPE_INSTANCE
1877   _OP_REQP = ["instance_name", "force"]
1878
1879   def BuildHooksEnv(self):
1880     """Build hooks env.
1881
1882     This runs on master, primary and secondary nodes of the instance.
1883
1884     """
1885     env = {
1886       "FORCE": self.op.force,
1887       }
1888     env.update(_BuildInstanceHookEnvByObject(self.instance))
1889     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1890           list(self.instance.secondary_nodes))
1891     return env, nl, nl
1892
1893   def CheckPrereq(self):
1894     """Check prerequisites.
1895
1896     This checks that the instance is in the cluster.
1897
1898     """
1899     instance = self.cfg.GetInstanceInfo(
1900       self.cfg.ExpandInstanceName(self.op.instance_name))
1901     if instance is None:
1902       raise errors.OpPrereqError("Instance '%s' not known" %
1903                                  self.op.instance_name)
1904
1905     # check bridges existance
1906     _CheckInstanceBridgesExist(instance)
1907
1908     self.instance = instance
1909     self.op.instance_name = instance.name
1910
1911   def Exec(self, feedback_fn):
1912     """Start the instance.
1913
1914     """
1915     instance = self.instance
1916     force = self.op.force
1917     extra_args = getattr(self.op, "extra_args", "")
1918
1919     node_current = instance.primary_node
1920
1921     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1922     if not nodeinfo:
1923       raise errors.OpExecError("Could not contact node %s for infos" %
1924                                (node_current))
1925
1926     freememory = nodeinfo[node_current]['memory_free']
1927     memory = instance.memory
1928     if memory > freememory:
1929       raise errors.OpExecError("Not enough memory to start instance"
1930                                " %s on node %s"
1931                                " needed %s MiB, available %s MiB" %
1932                                (instance.name, node_current, memory,
1933                                 freememory))
1934
1935     _StartInstanceDisks(self.cfg, instance, force)
1936
1937     if not rpc.call_instance_start(node_current, instance, extra_args):
1938       _ShutdownInstanceDisks(instance, self.cfg)
1939       raise errors.OpExecError("Could not start instance")
1940
1941     self.cfg.MarkInstanceUp(instance.name)
1942
1943
1944 class LURebootInstance(LogicalUnit):
1945   """Reboot an instance.
1946
1947   """
1948   HPATH = "instance-reboot"
1949   HTYPE = constants.HTYPE_INSTANCE
1950   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1951
1952   def BuildHooksEnv(self):
1953     """Build hooks env.
1954
1955     This runs on master, primary and secondary nodes of the instance.
1956
1957     """
1958     env = {
1959       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1960       }
1961     env.update(_BuildInstanceHookEnvByObject(self.instance))
1962     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1963           list(self.instance.secondary_nodes))
1964     return env, nl, nl
1965
1966   def CheckPrereq(self):
1967     """Check prerequisites.
1968
1969     This checks that the instance is in the cluster.
1970
1971     """
1972     instance = self.cfg.GetInstanceInfo(
1973       self.cfg.ExpandInstanceName(self.op.instance_name))
1974     if instance is None:
1975       raise errors.OpPrereqError("Instance '%s' not known" %
1976                                  self.op.instance_name)
1977
1978     # check bridges existance
1979     _CheckInstanceBridgesExist(instance)
1980
1981     self.instance = instance
1982     self.op.instance_name = instance.name
1983
1984   def Exec(self, feedback_fn):
1985     """Reboot the instance.
1986
1987     """
1988     instance = self.instance
1989     ignore_secondaries = self.op.ignore_secondaries
1990     reboot_type = self.op.reboot_type
1991     extra_args = getattr(self.op, "extra_args", "")
1992
1993     node_current = instance.primary_node
1994
1995     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
1996                            constants.INSTANCE_REBOOT_HARD,
1997                            constants.INSTANCE_REBOOT_FULL]:
1998       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
1999                                   (constants.INSTANCE_REBOOT_SOFT,
2000                                    constants.INSTANCE_REBOOT_HARD,
2001                                    constants.INSTANCE_REBOOT_FULL))
2002
2003     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2004                        constants.INSTANCE_REBOOT_HARD]:
2005       if not rpc.call_instance_reboot(node_current, instance,
2006                                       reboot_type, extra_args):
2007         raise errors.OpExecError("Could not reboot instance")
2008     else:
2009       if not rpc.call_instance_shutdown(node_current, instance):
2010         raise errors.OpExecError("could not shutdown instance for full reboot")
2011       _ShutdownInstanceDisks(instance, self.cfg)
2012       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2013       if not rpc.call_instance_start(node_current, instance, extra_args):
2014         _ShutdownInstanceDisks(instance, self.cfg)
2015         raise errors.OpExecError("Could not start instance for full reboot")
2016
2017     self.cfg.MarkInstanceUp(instance.name)
2018
2019
2020 class LUShutdownInstance(LogicalUnit):
2021   """Shutdown an instance.
2022
2023   """
2024   HPATH = "instance-stop"
2025   HTYPE = constants.HTYPE_INSTANCE
2026   _OP_REQP = ["instance_name"]
2027
2028   def BuildHooksEnv(self):
2029     """Build hooks env.
2030
2031     This runs on master, primary and secondary nodes of the instance.
2032
2033     """
2034     env = _BuildInstanceHookEnvByObject(self.instance)
2035     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2036           list(self.instance.secondary_nodes))
2037     return env, nl, nl
2038
2039   def CheckPrereq(self):
2040     """Check prerequisites.
2041
2042     This checks that the instance is in the cluster.
2043
2044     """
2045     instance = self.cfg.GetInstanceInfo(
2046       self.cfg.ExpandInstanceName(self.op.instance_name))
2047     if instance is None:
2048       raise errors.OpPrereqError("Instance '%s' not known" %
2049                                  self.op.instance_name)
2050     self.instance = instance
2051
2052   def Exec(self, feedback_fn):
2053     """Shutdown the instance.
2054
2055     """
2056     instance = self.instance
2057     node_current = instance.primary_node
2058     if not rpc.call_instance_shutdown(node_current, instance):
2059       logger.Error("could not shutdown instance")
2060
2061     self.cfg.MarkInstanceDown(instance.name)
2062     _ShutdownInstanceDisks(instance, self.cfg)
2063
2064
2065 class LUReinstallInstance(LogicalUnit):
2066   """Reinstall an instance.
2067
2068   """
2069   HPATH = "instance-reinstall"
2070   HTYPE = constants.HTYPE_INSTANCE
2071   _OP_REQP = ["instance_name"]
2072
2073   def BuildHooksEnv(self):
2074     """Build hooks env.
2075
2076     This runs on master, primary and secondary nodes of the instance.
2077
2078     """
2079     env = _BuildInstanceHookEnvByObject(self.instance)
2080     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2081           list(self.instance.secondary_nodes))
2082     return env, nl, nl
2083
2084   def CheckPrereq(self):
2085     """Check prerequisites.
2086
2087     This checks that the instance is in the cluster and is not running.
2088
2089     """
2090     instance = self.cfg.GetInstanceInfo(
2091       self.cfg.ExpandInstanceName(self.op.instance_name))
2092     if instance is None:
2093       raise errors.OpPrereqError("Instance '%s' not known" %
2094                                  self.op.instance_name)
2095     if instance.disk_template == constants.DT_DISKLESS:
2096       raise errors.OpPrereqError("Instance '%s' has no disks" %
2097                                  self.op.instance_name)
2098     if instance.status != "down":
2099       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2100                                  self.op.instance_name)
2101     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2102     if remote_info:
2103       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2104                                  (self.op.instance_name,
2105                                   instance.primary_node))
2106
2107     self.op.os_type = getattr(self.op, "os_type", None)
2108     if self.op.os_type is not None:
2109       # OS verification
2110       pnode = self.cfg.GetNodeInfo(
2111         self.cfg.ExpandNodeName(instance.primary_node))
2112       if pnode is None:
2113         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2114                                    self.op.pnode)
2115       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2116       if not os_obj:
2117         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2118                                    " primary node"  % self.op.os_type)
2119
2120     self.instance = instance
2121
2122   def Exec(self, feedback_fn):
2123     """Reinstall the instance.
2124
2125     """
2126     inst = self.instance
2127
2128     if self.op.os_type is not None:
2129       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2130       inst.os = self.op.os_type
2131       self.cfg.AddInstance(inst)
2132
2133     _StartInstanceDisks(self.cfg, inst, None)
2134     try:
2135       feedback_fn("Running the instance OS create scripts...")
2136       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2137         raise errors.OpExecError("Could not install OS for instance %s"
2138                                  " on node %s" %
2139                                  (inst.name, inst.primary_node))
2140     finally:
2141       _ShutdownInstanceDisks(inst, self.cfg)
2142
2143
2144 class LURenameInstance(LogicalUnit):
2145   """Rename an instance.
2146
2147   """
2148   HPATH = "instance-rename"
2149   HTYPE = constants.HTYPE_INSTANCE
2150   _OP_REQP = ["instance_name", "new_name"]
2151
2152   def BuildHooksEnv(self):
2153     """Build hooks env.
2154
2155     This runs on master, primary and secondary nodes of the instance.
2156
2157     """
2158     env = _BuildInstanceHookEnvByObject(self.instance)
2159     env["INSTANCE_NEW_NAME"] = self.op.new_name
2160     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2161           list(self.instance.secondary_nodes))
2162     return env, nl, nl
2163
2164   def CheckPrereq(self):
2165     """Check prerequisites.
2166
2167     This checks that the instance is in the cluster and is not running.
2168
2169     """
2170     instance = self.cfg.GetInstanceInfo(
2171       self.cfg.ExpandInstanceName(self.op.instance_name))
2172     if instance is None:
2173       raise errors.OpPrereqError("Instance '%s' not known" %
2174                                  self.op.instance_name)
2175     if instance.status != "down":
2176       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2177                                  self.op.instance_name)
2178     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2179     if remote_info:
2180       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2181                                  (self.op.instance_name,
2182                                   instance.primary_node))
2183     self.instance = instance
2184
2185     # new name verification
2186     name_info = utils.HostInfo(self.op.new_name)
2187
2188     self.op.new_name = new_name = name_info.name
2189     if not getattr(self.op, "ignore_ip", False):
2190       command = ["fping", "-q", name_info.ip]
2191       result = utils.RunCmd(command)
2192       if not result.failed:
2193         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2194                                    (name_info.ip, new_name))
2195
2196
2197   def Exec(self, feedback_fn):
2198     """Reinstall the instance.
2199
2200     """
2201     inst = self.instance
2202     old_name = inst.name
2203
2204     self.cfg.RenameInstance(inst.name, self.op.new_name)
2205
2206     # re-read the instance from the configuration after rename
2207     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2208
2209     _StartInstanceDisks(self.cfg, inst, None)
2210     try:
2211       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2212                                           "sda", "sdb"):
2213         msg = ("Could run OS rename script for instance %s on node %s (but the"
2214                " instance has been renamed in Ganeti)" %
2215                (inst.name, inst.primary_node))
2216         logger.Error(msg)
2217     finally:
2218       _ShutdownInstanceDisks(inst, self.cfg)
2219
2220
2221 class LURemoveInstance(LogicalUnit):
2222   """Remove an instance.
2223
2224   """
2225   HPATH = "instance-remove"
2226   HTYPE = constants.HTYPE_INSTANCE
2227   _OP_REQP = ["instance_name"]
2228
2229   def BuildHooksEnv(self):
2230     """Build hooks env.
2231
2232     This runs on master, primary and secondary nodes of the instance.
2233
2234     """
2235     env = _BuildInstanceHookEnvByObject(self.instance)
2236     nl = [self.sstore.GetMasterNode()]
2237     return env, nl, nl
2238
2239   def CheckPrereq(self):
2240     """Check prerequisites.
2241
2242     This checks that the instance is in the cluster.
2243
2244     """
2245     instance = self.cfg.GetInstanceInfo(
2246       self.cfg.ExpandInstanceName(self.op.instance_name))
2247     if instance is None:
2248       raise errors.OpPrereqError("Instance '%s' not known" %
2249                                  self.op.instance_name)
2250     self.instance = instance
2251
2252   def Exec(self, feedback_fn):
2253     """Remove the instance.
2254
2255     """
2256     instance = self.instance
2257     logger.Info("shutting down instance %s on node %s" %
2258                 (instance.name, instance.primary_node))
2259
2260     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2261       if self.op.ignore_failures:
2262         feedback_fn("Warning: can't shutdown instance")
2263       else:
2264         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2265                                  (instance.name, instance.primary_node))
2266
2267     logger.Info("removing block devices for instance %s" % instance.name)
2268
2269     if not _RemoveDisks(instance, self.cfg):
2270       if self.op.ignore_failures:
2271         feedback_fn("Warning: can't remove instance's disks")
2272       else:
2273         raise errors.OpExecError("Can't remove instance's disks")
2274
2275     logger.Info("removing instance %s out of cluster config" % instance.name)
2276
2277     self.cfg.RemoveInstance(instance.name)
2278
2279
2280 class LUQueryInstances(NoHooksLU):
2281   """Logical unit for querying instances.
2282
2283   """
2284   _OP_REQP = ["output_fields", "names"]
2285
2286   def CheckPrereq(self):
2287     """Check prerequisites.
2288
2289     This checks that the fields required are valid output fields.
2290
2291     """
2292     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2293     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2294                                "admin_state", "admin_ram",
2295                                "disk_template", "ip", "mac", "bridge",
2296                                "sda_size", "sdb_size"],
2297                        dynamic=self.dynamic_fields,
2298                        selected=self.op.output_fields)
2299
2300     self.wanted = _GetWantedInstances(self, self.op.names)
2301
2302   def Exec(self, feedback_fn):
2303     """Computes the list of nodes and their attributes.
2304
2305     """
2306     instance_names = self.wanted
2307     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2308                      in instance_names]
2309
2310     # begin data gathering
2311
2312     nodes = frozenset([inst.primary_node for inst in instance_list])
2313
2314     bad_nodes = []
2315     if self.dynamic_fields.intersection(self.op.output_fields):
2316       live_data = {}
2317       node_data = rpc.call_all_instances_info(nodes)
2318       for name in nodes:
2319         result = node_data[name]
2320         if result:
2321           live_data.update(result)
2322         elif result == False:
2323           bad_nodes.append(name)
2324         # else no instance is alive
2325     else:
2326       live_data = dict([(name, {}) for name in instance_names])
2327
2328     # end data gathering
2329
2330     output = []
2331     for instance in instance_list:
2332       iout = []
2333       for field in self.op.output_fields:
2334         if field == "name":
2335           val = instance.name
2336         elif field == "os":
2337           val = instance.os
2338         elif field == "pnode":
2339           val = instance.primary_node
2340         elif field == "snodes":
2341           val = list(instance.secondary_nodes)
2342         elif field == "admin_state":
2343           val = (instance.status != "down")
2344         elif field == "oper_state":
2345           if instance.primary_node in bad_nodes:
2346             val = None
2347           else:
2348             val = bool(live_data.get(instance.name))
2349         elif field == "admin_ram":
2350           val = instance.memory
2351         elif field == "oper_ram":
2352           if instance.primary_node in bad_nodes:
2353             val = None
2354           elif instance.name in live_data:
2355             val = live_data[instance.name].get("memory", "?")
2356           else:
2357             val = "-"
2358         elif field == "disk_template":
2359           val = instance.disk_template
2360         elif field == "ip":
2361           val = instance.nics[0].ip
2362         elif field == "bridge":
2363           val = instance.nics[0].bridge
2364         elif field == "mac":
2365           val = instance.nics[0].mac
2366         elif field == "sda_size" or field == "sdb_size":
2367           disk = instance.FindDisk(field[:3])
2368           if disk is None:
2369             val = None
2370           else:
2371             val = disk.size
2372         else:
2373           raise errors.ParameterError(field)
2374         iout.append(val)
2375       output.append(iout)
2376
2377     return output
2378
2379
2380 class LUFailoverInstance(LogicalUnit):
2381   """Failover an instance.
2382
2383   """
2384   HPATH = "instance-failover"
2385   HTYPE = constants.HTYPE_INSTANCE
2386   _OP_REQP = ["instance_name", "ignore_consistency"]
2387
2388   def BuildHooksEnv(self):
2389     """Build hooks env.
2390
2391     This runs on master, primary and secondary nodes of the instance.
2392
2393     """
2394     env = {
2395       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2396       }
2397     env.update(_BuildInstanceHookEnvByObject(self.instance))
2398     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2399     return env, nl, nl
2400
2401   def CheckPrereq(self):
2402     """Check prerequisites.
2403
2404     This checks that the instance is in the cluster.
2405
2406     """
2407     instance = self.cfg.GetInstanceInfo(
2408       self.cfg.ExpandInstanceName(self.op.instance_name))
2409     if instance is None:
2410       raise errors.OpPrereqError("Instance '%s' not known" %
2411                                  self.op.instance_name)
2412
2413     if instance.disk_template not in constants.DTS_NET_MIRROR:
2414       raise errors.OpPrereqError("Instance's disk layout is not"
2415                                  " network mirrored, cannot failover.")
2416
2417     secondary_nodes = instance.secondary_nodes
2418     if not secondary_nodes:
2419       raise errors.ProgrammerError("no secondary node but using "
2420                                    "DT_REMOTE_RAID1 template")
2421
2422     # check memory requirements on the secondary node
2423     target_node = secondary_nodes[0]
2424     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2425     info = nodeinfo.get(target_node, None)
2426     if not info:
2427       raise errors.OpPrereqError("Cannot get current information"
2428                                  " from node '%s'" % nodeinfo)
2429     if instance.memory > info['memory_free']:
2430       raise errors.OpPrereqError("Not enough memory on target node %s."
2431                                  " %d MB available, %d MB required" %
2432                                  (target_node, info['memory_free'],
2433                                   instance.memory))
2434
2435     # check bridge existance
2436     brlist = [nic.bridge for nic in instance.nics]
2437     if not rpc.call_bridges_exist(target_node, brlist):
2438       raise errors.OpPrereqError("One or more target bridges %s does not"
2439                                  " exist on destination node '%s'" %
2440                                  (brlist, target_node))
2441
2442     self.instance = instance
2443
2444   def Exec(self, feedback_fn):
2445     """Failover an instance.
2446
2447     The failover is done by shutting it down on its present node and
2448     starting it on the secondary.
2449
2450     """
2451     instance = self.instance
2452
2453     source_node = instance.primary_node
2454     target_node = instance.secondary_nodes[0]
2455
2456     feedback_fn("* checking disk consistency between source and target")
2457     for dev in instance.disks:
2458       # for remote_raid1, these are md over drbd
2459       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2460         if not self.op.ignore_consistency:
2461           raise errors.OpExecError("Disk %s is degraded on target node,"
2462                                    " aborting failover." % dev.iv_name)
2463
2464     feedback_fn("* checking target node resource availability")
2465     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2466
2467     if not nodeinfo:
2468       raise errors.OpExecError("Could not contact target node %s." %
2469                                target_node)
2470
2471     free_memory = int(nodeinfo[target_node]['memory_free'])
2472     memory = instance.memory
2473     if memory > free_memory:
2474       raise errors.OpExecError("Not enough memory to create instance %s on"
2475                                " node %s. needed %s MiB, available %s MiB" %
2476                                (instance.name, target_node, memory,
2477                                 free_memory))
2478
2479     feedback_fn("* shutting down instance on source node")
2480     logger.Info("Shutting down instance %s on node %s" %
2481                 (instance.name, source_node))
2482
2483     if not rpc.call_instance_shutdown(source_node, instance):
2484       if self.op.ignore_consistency:
2485         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2486                      " anyway. Please make sure node %s is down"  %
2487                      (instance.name, source_node, source_node))
2488       else:
2489         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2490                                  (instance.name, source_node))
2491
2492     feedback_fn("* deactivating the instance's disks on source node")
2493     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2494       raise errors.OpExecError("Can't shut down the instance's disks.")
2495
2496     instance.primary_node = target_node
2497     # distribute new instance config to the other nodes
2498     self.cfg.AddInstance(instance)
2499
2500     feedback_fn("* activating the instance's disks on target node")
2501     logger.Info("Starting instance %s on node %s" %
2502                 (instance.name, target_node))
2503
2504     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2505                                              ignore_secondaries=True)
2506     if not disks_ok:
2507       _ShutdownInstanceDisks(instance, self.cfg)
2508       raise errors.OpExecError("Can't activate the instance's disks")
2509
2510     feedback_fn("* starting the instance on the target node")
2511     if not rpc.call_instance_start(target_node, instance, None):
2512       _ShutdownInstanceDisks(instance, self.cfg)
2513       raise errors.OpExecError("Could not start instance %s on node %s." %
2514                                (instance.name, target_node))
2515
2516
2517 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2518   """Create a tree of block devices on the primary node.
2519
2520   This always creates all devices.
2521
2522   """
2523   if device.children:
2524     for child in device.children:
2525       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2526         return False
2527
2528   cfg.SetDiskID(device, node)
2529   new_id = rpc.call_blockdev_create(node, device, device.size,
2530                                     instance.name, True, info)
2531   if not new_id:
2532     return False
2533   if device.physical_id is None:
2534     device.physical_id = new_id
2535   return True
2536
2537
2538 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2539   """Create a tree of block devices on a secondary node.
2540
2541   If this device type has to be created on secondaries, create it and
2542   all its children.
2543
2544   If not, just recurse to children keeping the same 'force' value.
2545
2546   """
2547   if device.CreateOnSecondary():
2548     force = True
2549   if device.children:
2550     for child in device.children:
2551       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2552                                         child, force, info):
2553         return False
2554
2555   if not force:
2556     return True
2557   cfg.SetDiskID(device, node)
2558   new_id = rpc.call_blockdev_create(node, device, device.size,
2559                                     instance.name, False, info)
2560   if not new_id:
2561     return False
2562   if device.physical_id is None:
2563     device.physical_id = new_id
2564   return True
2565
2566
2567 def _GenerateUniqueNames(cfg, exts):
2568   """Generate a suitable LV name.
2569
2570   This will generate a logical volume name for the given instance.
2571
2572   """
2573   results = []
2574   for val in exts:
2575     new_id = cfg.GenerateUniqueID()
2576     results.append("%s%s" % (new_id, val))
2577   return results
2578
2579
2580 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2581   """Generate a drbd device complete with its children.
2582
2583   """
2584   port = cfg.AllocatePort()
2585   vgname = cfg.GetVGName()
2586   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2587                           logical_id=(vgname, names[0]))
2588   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2589                           logical_id=(vgname, names[1]))
2590   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2591                           logical_id = (primary, secondary, port),
2592                           children = [dev_data, dev_meta])
2593   return drbd_dev
2594
2595
2596 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2597   """Generate a drbd8 device complete with its children.
2598
2599   """
2600   port = cfg.AllocatePort()
2601   vgname = cfg.GetVGName()
2602   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2603                           logical_id=(vgname, names[0]))
2604   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2605                           logical_id=(vgname, names[1]))
2606   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2607                           logical_id = (primary, secondary, port),
2608                           children = [dev_data, dev_meta],
2609                           iv_name=iv_name)
2610   return drbd_dev
2611
2612 def _GenerateDiskTemplate(cfg, template_name,
2613                           instance_name, primary_node,
2614                           secondary_nodes, disk_sz, swap_sz):
2615   """Generate the entire disk layout for a given template type.
2616
2617   """
2618   #TODO: compute space requirements
2619
2620   vgname = cfg.GetVGName()
2621   if template_name == "diskless":
2622     disks = []
2623   elif template_name == "plain":
2624     if len(secondary_nodes) != 0:
2625       raise errors.ProgrammerError("Wrong template configuration")
2626
2627     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2628     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2629                            logical_id=(vgname, names[0]),
2630                            iv_name = "sda")
2631     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2632                            logical_id=(vgname, names[1]),
2633                            iv_name = "sdb")
2634     disks = [sda_dev, sdb_dev]
2635   elif template_name == "local_raid1":
2636     if len(secondary_nodes) != 0:
2637       raise errors.ProgrammerError("Wrong template configuration")
2638
2639
2640     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2641                                        ".sdb_m1", ".sdb_m2"])
2642     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2643                               logical_id=(vgname, names[0]))
2644     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2645                               logical_id=(vgname, names[1]))
2646     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2647                               size=disk_sz,
2648                               children = [sda_dev_m1, sda_dev_m2])
2649     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2650                               logical_id=(vgname, names[2]))
2651     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2652                               logical_id=(vgname, names[3]))
2653     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2654                               size=swap_sz,
2655                               children = [sdb_dev_m1, sdb_dev_m2])
2656     disks = [md_sda_dev, md_sdb_dev]
2657   elif template_name == constants.DT_REMOTE_RAID1:
2658     if len(secondary_nodes) != 1:
2659       raise errors.ProgrammerError("Wrong template configuration")
2660     remote_node = secondary_nodes[0]
2661     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2662                                        ".sdb_data", ".sdb_meta"])
2663     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2664                                          disk_sz, names[0:2])
2665     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2666                               children = [drbd_sda_dev], size=disk_sz)
2667     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2668                                          swap_sz, names[2:4])
2669     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2670                               children = [drbd_sdb_dev], size=swap_sz)
2671     disks = [md_sda_dev, md_sdb_dev]
2672   elif template_name == constants.DT_DRBD8:
2673     if len(secondary_nodes) != 1:
2674       raise errors.ProgrammerError("Wrong template configuration")
2675     remote_node = secondary_nodes[0]
2676     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2677                                        ".sdb_data", ".sdb_meta"])
2678     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2679                                          disk_sz, names[0:2], "sda")
2680     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2681                                          swap_sz, names[2:4], "sdb")
2682     disks = [drbd_sda_dev, drbd_sdb_dev]
2683   else:
2684     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2685   return disks
2686
2687
2688 def _GetInstanceInfoText(instance):
2689   """Compute that text that should be added to the disk's metadata.
2690
2691   """
2692   return "originstname+%s" % instance.name
2693
2694
2695 def _CreateDisks(cfg, instance):
2696   """Create all disks for an instance.
2697
2698   This abstracts away some work from AddInstance.
2699
2700   Args:
2701     instance: the instance object
2702
2703   Returns:
2704     True or False showing the success of the creation process
2705
2706   """
2707   info = _GetInstanceInfoText(instance)
2708
2709   for device in instance.disks:
2710     logger.Info("creating volume %s for instance %s" %
2711               (device.iv_name, instance.name))
2712     #HARDCODE
2713     for secondary_node in instance.secondary_nodes:
2714       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2715                                         device, False, info):
2716         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2717                      (device.iv_name, device, secondary_node))
2718         return False
2719     #HARDCODE
2720     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2721                                     instance, device, info):
2722       logger.Error("failed to create volume %s on primary!" %
2723                    device.iv_name)
2724       return False
2725   return True
2726
2727
2728 def _RemoveDisks(instance, cfg):
2729   """Remove all disks for an instance.
2730
2731   This abstracts away some work from `AddInstance()` and
2732   `RemoveInstance()`. Note that in case some of the devices couldn't
2733   be removed, the removal will continue with the other ones (compare
2734   with `_CreateDisks()`).
2735
2736   Args:
2737     instance: the instance object
2738
2739   Returns:
2740     True or False showing the success of the removal proces
2741
2742   """
2743   logger.Info("removing block devices for instance %s" % instance.name)
2744
2745   result = True
2746   for device in instance.disks:
2747     for node, disk in device.ComputeNodeTree(instance.primary_node):
2748       cfg.SetDiskID(disk, node)
2749       if not rpc.call_blockdev_remove(node, disk):
2750         logger.Error("could not remove block device %s on node %s,"
2751                      " continuing anyway" %
2752                      (device.iv_name, node))
2753         result = False
2754   return result
2755
2756
2757 class LUCreateInstance(LogicalUnit):
2758   """Create an instance.
2759
2760   """
2761   HPATH = "instance-add"
2762   HTYPE = constants.HTYPE_INSTANCE
2763   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2764               "disk_template", "swap_size", "mode", "start", "vcpus",
2765               "wait_for_sync", "ip_check"]
2766
2767   def BuildHooksEnv(self):
2768     """Build hooks env.
2769
2770     This runs on master, primary and secondary nodes of the instance.
2771
2772     """
2773     env = {
2774       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2775       "INSTANCE_DISK_SIZE": self.op.disk_size,
2776       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2777       "INSTANCE_ADD_MODE": self.op.mode,
2778       }
2779     if self.op.mode == constants.INSTANCE_IMPORT:
2780       env["INSTANCE_SRC_NODE"] = self.op.src_node
2781       env["INSTANCE_SRC_PATH"] = self.op.src_path
2782       env["INSTANCE_SRC_IMAGE"] = self.src_image
2783
2784     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2785       primary_node=self.op.pnode,
2786       secondary_nodes=self.secondaries,
2787       status=self.instance_status,
2788       os_type=self.op.os_type,
2789       memory=self.op.mem_size,
2790       vcpus=self.op.vcpus,
2791       nics=[(self.inst_ip, self.op.bridge)],
2792     ))
2793
2794     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2795           self.secondaries)
2796     return env, nl, nl
2797
2798
2799   def CheckPrereq(self):
2800     """Check prerequisites.
2801
2802     """
2803     if self.op.mode not in (constants.INSTANCE_CREATE,
2804                             constants.INSTANCE_IMPORT):
2805       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2806                                  self.op.mode)
2807
2808     if self.op.mode == constants.INSTANCE_IMPORT:
2809       src_node = getattr(self.op, "src_node", None)
2810       src_path = getattr(self.op, "src_path", None)
2811       if src_node is None or src_path is None:
2812         raise errors.OpPrereqError("Importing an instance requires source"
2813                                    " node and path options")
2814       src_node_full = self.cfg.ExpandNodeName(src_node)
2815       if src_node_full is None:
2816         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2817       self.op.src_node = src_node = src_node_full
2818
2819       if not os.path.isabs(src_path):
2820         raise errors.OpPrereqError("The source path must be absolute")
2821
2822       export_info = rpc.call_export_info(src_node, src_path)
2823
2824       if not export_info:
2825         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2826
2827       if not export_info.has_section(constants.INISECT_EXP):
2828         raise errors.ProgrammerError("Corrupted export config")
2829
2830       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2831       if (int(ei_version) != constants.EXPORT_VERSION):
2832         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2833                                    (ei_version, constants.EXPORT_VERSION))
2834
2835       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2836         raise errors.OpPrereqError("Can't import instance with more than"
2837                                    " one data disk")
2838
2839       # FIXME: are the old os-es, disk sizes, etc. useful?
2840       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2841       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2842                                                          'disk0_dump'))
2843       self.src_image = diskimage
2844     else: # INSTANCE_CREATE
2845       if getattr(self.op, "os_type", None) is None:
2846         raise errors.OpPrereqError("No guest OS specified")
2847
2848     # check primary node
2849     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2850     if pnode is None:
2851       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2852                                  self.op.pnode)
2853     self.op.pnode = pnode.name
2854     self.pnode = pnode
2855     self.secondaries = []
2856     # disk template and mirror node verification
2857     if self.op.disk_template not in constants.DISK_TEMPLATES:
2858       raise errors.OpPrereqError("Invalid disk template name")
2859
2860     if self.op.disk_template in constants.DTS_NET_MIRROR:
2861       if getattr(self.op, "snode", None) is None:
2862         raise errors.OpPrereqError("The networked disk templates need"
2863                                    " a mirror node")
2864
2865       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2866       if snode_name is None:
2867         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2868                                    self.op.snode)
2869       elif snode_name == pnode.name:
2870         raise errors.OpPrereqError("The secondary node cannot be"
2871                                    " the primary node.")
2872       self.secondaries.append(snode_name)
2873
2874     # Check lv size requirements
2875     nodenames = [pnode.name] + self.secondaries
2876     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2877
2878     # Required free disk space as a function of disk and swap space
2879     req_size_dict = {
2880       constants.DT_DISKLESS: 0,
2881       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2882       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2883       # 256 MB are added for drbd metadata, 128MB for each drbd device
2884       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2885       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2886     }
2887
2888     if self.op.disk_template not in req_size_dict:
2889       raise errors.ProgrammerError("Disk template '%s' size requirement"
2890                                    " is unknown" %  self.op.disk_template)
2891
2892     req_size = req_size_dict[self.op.disk_template]
2893
2894     for node in nodenames:
2895       info = nodeinfo.get(node, None)
2896       if not info:
2897         raise errors.OpPrereqError("Cannot get current information"
2898                                    " from node '%s'" % nodeinfo)
2899       if req_size > info['vg_free']:
2900         raise errors.OpPrereqError("Not enough disk space on target node %s."
2901                                    " %d MB available, %d MB required" %
2902                                    (node, info['vg_free'], req_size))
2903
2904     # os verification
2905     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2906     if not os_obj:
2907       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2908                                  " primary node"  % self.op.os_type)
2909
2910     # instance verification
2911     hostname1 = utils.HostInfo(self.op.instance_name)
2912
2913     self.op.instance_name = instance_name = hostname1.name
2914     instance_list = self.cfg.GetInstanceList()
2915     if instance_name in instance_list:
2916       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2917                                  instance_name)
2918
2919     ip = getattr(self.op, "ip", None)
2920     if ip is None or ip.lower() == "none":
2921       inst_ip = None
2922     elif ip.lower() == "auto":
2923       inst_ip = hostname1.ip
2924     else:
2925       if not utils.IsValidIP(ip):
2926         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2927                                    " like a valid IP" % ip)
2928       inst_ip = ip
2929     self.inst_ip = inst_ip
2930
2931     if self.op.start and not self.op.ip_check:
2932       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2933                                  " adding an instance in start mode")
2934
2935     if self.op.ip_check:
2936       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2937                        constants.DEFAULT_NODED_PORT):
2938         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2939                                    (hostname1.ip, instance_name))
2940
2941     # bridge verification
2942     bridge = getattr(self.op, "bridge", None)
2943     if bridge is None:
2944       self.op.bridge = self.cfg.GetDefBridge()
2945     else:
2946       self.op.bridge = bridge
2947
2948     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2949       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2950                                  " destination node '%s'" %
2951                                  (self.op.bridge, pnode.name))
2952
2953     if self.op.start:
2954       self.instance_status = 'up'
2955     else:
2956       self.instance_status = 'down'
2957
2958   def Exec(self, feedback_fn):
2959     """Create and add the instance to the cluster.
2960
2961     """
2962     instance = self.op.instance_name
2963     pnode_name = self.pnode.name
2964
2965     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2966     if self.inst_ip is not None:
2967       nic.ip = self.inst_ip
2968
2969     disks = _GenerateDiskTemplate(self.cfg,
2970                                   self.op.disk_template,
2971                                   instance, pnode_name,
2972                                   self.secondaries, self.op.disk_size,
2973                                   self.op.swap_size)
2974
2975     iobj = objects.Instance(name=instance, os=self.op.os_type,
2976                             primary_node=pnode_name,
2977                             memory=self.op.mem_size,
2978                             vcpus=self.op.vcpus,
2979                             nics=[nic], disks=disks,
2980                             disk_template=self.op.disk_template,
2981                             status=self.instance_status,
2982                             )
2983
2984     feedback_fn("* creating instance disks...")
2985     if not _CreateDisks(self.cfg, iobj):
2986       _RemoveDisks(iobj, self.cfg)
2987       raise errors.OpExecError("Device creation failed, reverting...")
2988
2989     feedback_fn("adding instance %s to cluster config" % instance)
2990
2991     self.cfg.AddInstance(iobj)
2992
2993     if self.op.wait_for_sync:
2994       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
2995     elif iobj.disk_template in constants.DTS_NET_MIRROR:
2996       # make sure the disks are not degraded (still sync-ing is ok)
2997       time.sleep(15)
2998       feedback_fn("* checking mirrors status")
2999       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3000     else:
3001       disk_abort = False
3002
3003     if disk_abort:
3004       _RemoveDisks(iobj, self.cfg)
3005       self.cfg.RemoveInstance(iobj.name)
3006       raise errors.OpExecError("There are some degraded disks for"
3007                                " this instance")
3008
3009     feedback_fn("creating os for instance %s on node %s" %
3010                 (instance, pnode_name))
3011
3012     if iobj.disk_template != constants.DT_DISKLESS:
3013       if self.op.mode == constants.INSTANCE_CREATE:
3014         feedback_fn("* running the instance OS create scripts...")
3015         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3016           raise errors.OpExecError("could not add os for instance %s"
3017                                    " on node %s" %
3018                                    (instance, pnode_name))
3019
3020       elif self.op.mode == constants.INSTANCE_IMPORT:
3021         feedback_fn("* running the instance OS import scripts...")
3022         src_node = self.op.src_node
3023         src_image = self.src_image
3024         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3025                                                 src_node, src_image):
3026           raise errors.OpExecError("Could not import os for instance"
3027                                    " %s on node %s" %
3028                                    (instance, pnode_name))
3029       else:
3030         # also checked in the prereq part
3031         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3032                                      % self.op.mode)
3033
3034     if self.op.start:
3035       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3036       feedback_fn("* starting instance...")
3037       if not rpc.call_instance_start(pnode_name, iobj, None):
3038         raise errors.OpExecError("Could not start instance")
3039
3040
3041 class LUConnectConsole(NoHooksLU):
3042   """Connect to an instance's console.
3043
3044   This is somewhat special in that it returns the command line that
3045   you need to run on the master node in order to connect to the
3046   console.
3047
3048   """
3049   _OP_REQP = ["instance_name"]
3050
3051   def CheckPrereq(self):
3052     """Check prerequisites.
3053
3054     This checks that the instance is in the cluster.
3055
3056     """
3057     instance = self.cfg.GetInstanceInfo(
3058       self.cfg.ExpandInstanceName(self.op.instance_name))
3059     if instance is None:
3060       raise errors.OpPrereqError("Instance '%s' not known" %
3061                                  self.op.instance_name)
3062     self.instance = instance
3063
3064   def Exec(self, feedback_fn):
3065     """Connect to the console of an instance
3066
3067     """
3068     instance = self.instance
3069     node = instance.primary_node
3070
3071     node_insts = rpc.call_instance_list([node])[node]
3072     if node_insts is False:
3073       raise errors.OpExecError("Can't connect to node %s." % node)
3074
3075     if instance.name not in node_insts:
3076       raise errors.OpExecError("Instance %s is not running." % instance.name)
3077
3078     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3079
3080     hyper = hypervisor.GetHypervisor()
3081     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3082     # build ssh cmdline
3083     argv = ["ssh", "-q", "-t"]
3084     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3085     argv.extend(ssh.BATCH_MODE_OPTS)
3086     argv.append(node)
3087     argv.append(console_cmd)
3088     return "ssh", argv
3089
3090
3091 class LUAddMDDRBDComponent(LogicalUnit):
3092   """Adda new mirror member to an instance's disk.
3093
3094   """
3095   HPATH = "mirror-add"
3096   HTYPE = constants.HTYPE_INSTANCE
3097   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3098
3099   def BuildHooksEnv(self):
3100     """Build hooks env.
3101
3102     This runs on the master, the primary and all the secondaries.
3103
3104     """
3105     env = {
3106       "NEW_SECONDARY": self.op.remote_node,
3107       "DISK_NAME": self.op.disk_name,
3108       }
3109     env.update(_BuildInstanceHookEnvByObject(self.instance))
3110     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3111           self.op.remote_node,] + list(self.instance.secondary_nodes)
3112     return env, nl, nl
3113
3114   def CheckPrereq(self):
3115     """Check prerequisites.
3116
3117     This checks that the instance is in the cluster.
3118
3119     """
3120     instance = self.cfg.GetInstanceInfo(
3121       self.cfg.ExpandInstanceName(self.op.instance_name))
3122     if instance is None:
3123       raise errors.OpPrereqError("Instance '%s' not known" %
3124                                  self.op.instance_name)
3125     self.instance = instance
3126
3127     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3128     if remote_node is None:
3129       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3130     self.remote_node = remote_node
3131
3132     if remote_node == instance.primary_node:
3133       raise errors.OpPrereqError("The specified node is the primary node of"
3134                                  " the instance.")
3135
3136     if instance.disk_template != constants.DT_REMOTE_RAID1:
3137       raise errors.OpPrereqError("Instance's disk layout is not"
3138                                  " remote_raid1.")
3139     for disk in instance.disks:
3140       if disk.iv_name == self.op.disk_name:
3141         break
3142     else:
3143       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3144                                  " instance." % self.op.disk_name)
3145     if len(disk.children) > 1:
3146       raise errors.OpPrereqError("The device already has two slave devices."
3147                                  " This would create a 3-disk raid1 which we"
3148                                  " don't allow.")
3149     self.disk = disk
3150
3151   def Exec(self, feedback_fn):
3152     """Add the mirror component
3153
3154     """
3155     disk = self.disk
3156     instance = self.instance
3157
3158     remote_node = self.remote_node
3159     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3160     names = _GenerateUniqueNames(self.cfg, lv_names)
3161     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3162                                      remote_node, disk.size, names)
3163
3164     logger.Info("adding new mirror component on secondary")
3165     #HARDCODE
3166     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3167                                       new_drbd, False,
3168                                       _GetInstanceInfoText(instance)):
3169       raise errors.OpExecError("Failed to create new component on secondary"
3170                                " node %s" % remote_node)
3171
3172     logger.Info("adding new mirror component on primary")
3173     #HARDCODE
3174     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3175                                     instance, new_drbd,
3176                                     _GetInstanceInfoText(instance)):
3177       # remove secondary dev
3178       self.cfg.SetDiskID(new_drbd, remote_node)
3179       rpc.call_blockdev_remove(remote_node, new_drbd)
3180       raise errors.OpExecError("Failed to create volume on primary")
3181
3182     # the device exists now
3183     # call the primary node to add the mirror to md
3184     logger.Info("adding new mirror component to md")
3185     if not rpc.call_blockdev_addchildren(instance.primary_node,
3186                                          disk, [new_drbd]):
3187       logger.Error("Can't add mirror compoment to md!")
3188       self.cfg.SetDiskID(new_drbd, remote_node)
3189       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3190         logger.Error("Can't rollback on secondary")
3191       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3192       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3193         logger.Error("Can't rollback on primary")
3194       raise errors.OpExecError("Can't add mirror component to md array")
3195
3196     disk.children.append(new_drbd)
3197
3198     self.cfg.AddInstance(instance)
3199
3200     _WaitForSync(self.cfg, instance, self.proc)
3201
3202     return 0
3203
3204
3205 class LURemoveMDDRBDComponent(LogicalUnit):
3206   """Remove a component from a remote_raid1 disk.
3207
3208   """
3209   HPATH = "mirror-remove"
3210   HTYPE = constants.HTYPE_INSTANCE
3211   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3212
3213   def BuildHooksEnv(self):
3214     """Build hooks env.
3215
3216     This runs on the master, the primary and all the secondaries.
3217
3218     """
3219     env = {
3220       "DISK_NAME": self.op.disk_name,
3221       "DISK_ID": self.op.disk_id,
3222       "OLD_SECONDARY": self.old_secondary,
3223       }
3224     env.update(_BuildInstanceHookEnvByObject(self.instance))
3225     nl = [self.sstore.GetMasterNode(),
3226           self.instance.primary_node] + list(self.instance.secondary_nodes)
3227     return env, nl, nl
3228
3229   def CheckPrereq(self):
3230     """Check prerequisites.
3231
3232     This checks that the instance is in the cluster.
3233
3234     """
3235     instance = self.cfg.GetInstanceInfo(
3236       self.cfg.ExpandInstanceName(self.op.instance_name))
3237     if instance is None:
3238       raise errors.OpPrereqError("Instance '%s' not known" %
3239                                  self.op.instance_name)
3240     self.instance = instance
3241
3242     if instance.disk_template != constants.DT_REMOTE_RAID1:
3243       raise errors.OpPrereqError("Instance's disk layout is not"
3244                                  " remote_raid1.")
3245     for disk in instance.disks:
3246       if disk.iv_name == self.op.disk_name:
3247         break
3248     else:
3249       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3250                                  " instance." % self.op.disk_name)
3251     for child in disk.children:
3252       if (child.dev_type == constants.LD_DRBD7 and
3253           child.logical_id[2] == self.op.disk_id):
3254         break
3255     else:
3256       raise errors.OpPrereqError("Can't find the device with this port.")
3257
3258     if len(disk.children) < 2:
3259       raise errors.OpPrereqError("Cannot remove the last component from"
3260                                  " a mirror.")
3261     self.disk = disk
3262     self.child = child
3263     if self.child.logical_id[0] == instance.primary_node:
3264       oid = 1
3265     else:
3266       oid = 0
3267     self.old_secondary = self.child.logical_id[oid]
3268
3269   def Exec(self, feedback_fn):
3270     """Remove the mirror component
3271
3272     """
3273     instance = self.instance
3274     disk = self.disk
3275     child = self.child
3276     logger.Info("remove mirror component")
3277     self.cfg.SetDiskID(disk, instance.primary_node)
3278     if not rpc.call_blockdev_removechildren(instance.primary_node,
3279                                             disk, [child]):
3280       raise errors.OpExecError("Can't remove child from mirror.")
3281
3282     for node in child.logical_id[:2]:
3283       self.cfg.SetDiskID(child, node)
3284       if not rpc.call_blockdev_remove(node, child):
3285         logger.Error("Warning: failed to remove device from node %s,"
3286                      " continuing operation." % node)
3287
3288     disk.children.remove(child)
3289     self.cfg.AddInstance(instance)
3290
3291
3292 class LUReplaceDisks(LogicalUnit):
3293   """Replace the disks of an instance.
3294
3295   """
3296   HPATH = "mirrors-replace"
3297   HTYPE = constants.HTYPE_INSTANCE
3298   _OP_REQP = ["instance_name", "mode", "disks"]
3299
3300   def BuildHooksEnv(self):
3301     """Build hooks env.
3302
3303     This runs on the master, the primary and all the secondaries.
3304
3305     """
3306     env = {
3307       "MODE": self.op.mode,
3308       "NEW_SECONDARY": self.op.remote_node,
3309       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3310       }
3311     env.update(_BuildInstanceHookEnvByObject(self.instance))
3312     nl = [
3313       self.sstore.GetMasterNode(),
3314       self.instance.primary_node,
3315       ]
3316     if self.op.remote_node is not None:
3317       nl.append(self.op.remote_node)
3318     return env, nl, nl
3319
3320   def CheckPrereq(self):
3321     """Check prerequisites.
3322
3323     This checks that the instance is in the cluster.
3324
3325     """
3326     instance = self.cfg.GetInstanceInfo(
3327       self.cfg.ExpandInstanceName(self.op.instance_name))
3328     if instance is None:
3329       raise errors.OpPrereqError("Instance '%s' not known" %
3330                                  self.op.instance_name)
3331     self.instance = instance
3332     self.op.instance_name = instance.name
3333
3334     if instance.disk_template not in constants.DTS_NET_MIRROR:
3335       raise errors.OpPrereqError("Instance's disk layout is not"
3336                                  " network mirrored.")
3337
3338     if len(instance.secondary_nodes) != 1:
3339       raise errors.OpPrereqError("The instance has a strange layout,"
3340                                  " expected one secondary but found %d" %
3341                                  len(instance.secondary_nodes))
3342
3343     self.sec_node = instance.secondary_nodes[0]
3344
3345     remote_node = getattr(self.op, "remote_node", None)
3346     if remote_node is not None:
3347       remote_node = self.cfg.ExpandNodeName(remote_node)
3348       if remote_node is None:
3349         raise errors.OpPrereqError("Node '%s' not known" %
3350                                    self.op.remote_node)
3351       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3352     else:
3353       self.remote_node_info = None
3354     if remote_node == instance.primary_node:
3355       raise errors.OpPrereqError("The specified node is the primary node of"
3356                                  " the instance.")
3357     elif remote_node == self.sec_node:
3358       if self.op.mode == constants.REPLACE_DISK_SEC:
3359         # this is for DRBD8, where we can't execute the same mode of
3360         # replacement as for drbd7 (no different port allocated)
3361         raise errors.OpPrereqError("Same secondary given, cannot execute"
3362                                    " replacement")
3363       # the user gave the current secondary, switch to
3364       # 'no-replace-secondary' mode for drbd7
3365       remote_node = None
3366     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3367         self.op.mode != constants.REPLACE_DISK_ALL):
3368       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3369                                  " disks replacement, not individual ones")
3370     if instance.disk_template == constants.DT_DRBD8:
3371       if (self.op.mode == constants.REPLACE_DISK_ALL and
3372           remote_node is not None):
3373         # switch to replace secondary mode
3374         self.op.mode = constants.REPLACE_DISK_SEC
3375
3376       if self.op.mode == constants.REPLACE_DISK_ALL:
3377         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3378                                    " secondary disk replacement, not"
3379                                    " both at once")
3380       elif self.op.mode == constants.REPLACE_DISK_PRI:
3381         if remote_node is not None:
3382           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3383                                      " the secondary while doing a primary"
3384                                      " node disk replacement")
3385         self.tgt_node = instance.primary_node
3386         self.oth_node = instance.secondary_nodes[0]
3387       elif self.op.mode == constants.REPLACE_DISK_SEC:
3388         self.new_node = remote_node # this can be None, in which case
3389                                     # we don't change the secondary
3390         self.tgt_node = instance.secondary_nodes[0]
3391         self.oth_node = instance.primary_node
3392       else:
3393         raise errors.ProgrammerError("Unhandled disk replace mode")
3394
3395     for name in self.op.disks:
3396       if instance.FindDisk(name) is None:
3397         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3398                                    (name, instance.name))
3399     self.op.remote_node = remote_node
3400
3401   def _ExecRR1(self, feedback_fn):
3402     """Replace the disks of an instance.
3403
3404     """
3405     instance = self.instance
3406     iv_names = {}
3407     # start of work
3408     if self.op.remote_node is None:
3409       remote_node = self.sec_node
3410     else:
3411       remote_node = self.op.remote_node
3412     cfg = self.cfg
3413     for dev in instance.disks:
3414       size = dev.size
3415       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3416       names = _GenerateUniqueNames(cfg, lv_names)
3417       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3418                                        remote_node, size, names)
3419       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3420       logger.Info("adding new mirror component on secondary for %s" %
3421                   dev.iv_name)
3422       #HARDCODE
3423       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3424                                         new_drbd, False,
3425                                         _GetInstanceInfoText(instance)):
3426         raise errors.OpExecError("Failed to create new component on secondary"
3427                                  " node %s. Full abort, cleanup manually!" %
3428                                  remote_node)
3429
3430       logger.Info("adding new mirror component on primary")
3431       #HARDCODE
3432       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3433                                       instance, new_drbd,
3434                                       _GetInstanceInfoText(instance)):
3435         # remove secondary dev
3436         cfg.SetDiskID(new_drbd, remote_node)
3437         rpc.call_blockdev_remove(remote_node, new_drbd)
3438         raise errors.OpExecError("Failed to create volume on primary!"
3439                                  " Full abort, cleanup manually!!")
3440
3441       # the device exists now
3442       # call the primary node to add the mirror to md
3443       logger.Info("adding new mirror component to md")
3444       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3445                                            [new_drbd]):
3446         logger.Error("Can't add mirror compoment to md!")
3447         cfg.SetDiskID(new_drbd, remote_node)
3448         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3449           logger.Error("Can't rollback on secondary")
3450         cfg.SetDiskID(new_drbd, instance.primary_node)
3451         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3452           logger.Error("Can't rollback on primary")
3453         raise errors.OpExecError("Full abort, cleanup manually!!")
3454
3455       dev.children.append(new_drbd)
3456       cfg.AddInstance(instance)
3457
3458     # this can fail as the old devices are degraded and _WaitForSync
3459     # does a combined result over all disks, so we don't check its
3460     # return value
3461     _WaitForSync(cfg, instance, self.proc, unlock=True)
3462
3463     # so check manually all the devices
3464     for name in iv_names:
3465       dev, child, new_drbd = iv_names[name]
3466       cfg.SetDiskID(dev, instance.primary_node)
3467       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3468       if is_degr:
3469         raise errors.OpExecError("MD device %s is degraded!" % name)
3470       cfg.SetDiskID(new_drbd, instance.primary_node)
3471       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3472       if is_degr:
3473         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3474
3475     for name in iv_names:
3476       dev, child, new_drbd = iv_names[name]
3477       logger.Info("remove mirror %s component" % name)
3478       cfg.SetDiskID(dev, instance.primary_node)
3479       if not rpc.call_blockdev_removechildren(instance.primary_node,
3480                                               dev, [child]):
3481         logger.Error("Can't remove child from mirror, aborting"
3482                      " *this device cleanup*.\nYou need to cleanup manually!!")
3483         continue
3484
3485       for node in child.logical_id[:2]:
3486         logger.Info("remove child device on %s" % node)
3487         cfg.SetDiskID(child, node)
3488         if not rpc.call_blockdev_remove(node, child):
3489           logger.Error("Warning: failed to remove device from node %s,"
3490                        " continuing operation." % node)
3491
3492       dev.children.remove(child)
3493
3494       cfg.AddInstance(instance)
3495
3496   def _ExecD8DiskOnly(self, feedback_fn):
3497     """Replace a disk on the primary or secondary for dbrd8.
3498
3499     The algorithm for replace is quite complicated:
3500       - for each disk to be replaced:
3501         - create new LVs on the target node with unique names
3502         - detach old LVs from the drbd device
3503         - rename old LVs to name_replaced.<time_t>
3504         - rename new LVs to old LVs
3505         - attach the new LVs (with the old names now) to the drbd device
3506       - wait for sync across all devices
3507       - for each modified disk:
3508         - remove old LVs (which have the name name_replaces.<time_t>)
3509
3510     Failures are not very well handled.
3511
3512     """
3513     steps_total = 6
3514     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3515     instance = self.instance
3516     iv_names = {}
3517     vgname = self.cfg.GetVGName()
3518     # start of work
3519     cfg = self.cfg
3520     tgt_node = self.tgt_node
3521     oth_node = self.oth_node
3522
3523     # Step: check device activation
3524     self.proc.LogStep(1, steps_total, "check device existence")
3525     info("checking volume groups")
3526     my_vg = cfg.GetVGName()
3527     results = rpc.call_vg_list([oth_node, tgt_node])
3528     if not results:
3529       raise errors.OpExecError("Can't list volume groups on the nodes")
3530     for node in oth_node, tgt_node:
3531       res = results.get(node, False)
3532       if not res or my_vg not in res:
3533         raise errors.OpExecError("Volume group '%s' not found on %s" %
3534                                  (my_vg, node))
3535     for dev in instance.disks:
3536       if not dev.iv_name in self.op.disks:
3537         continue
3538       for node in tgt_node, oth_node:
3539         info("checking %s on %s" % (dev.iv_name, node))
3540         cfg.SetDiskID(dev, node)
3541         if not rpc.call_blockdev_find(node, dev):
3542           raise errors.OpExecError("Can't find device %s on node %s" %
3543                                    (dev.iv_name, node))
3544
3545     # Step: check other node consistency
3546     self.proc.LogStep(2, steps_total, "check peer consistency")
3547     for dev in instance.disks:
3548       if not dev.iv_name in self.op.disks:
3549         continue
3550       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3551       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3552                                    oth_node==instance.primary_node):
3553         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3554                                  " to replace disks on this node (%s)" %
3555                                  (oth_node, tgt_node))
3556
3557     # Step: create new storage
3558     self.proc.LogStep(3, steps_total, "allocate new storage")
3559     for dev in instance.disks:
3560       if not dev.iv_name in self.op.disks:
3561         continue
3562       size = dev.size
3563       cfg.SetDiskID(dev, tgt_node)
3564       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3565       names = _GenerateUniqueNames(cfg, lv_names)
3566       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3567                              logical_id=(vgname, names[0]))
3568       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3569                              logical_id=(vgname, names[1]))
3570       new_lvs = [lv_data, lv_meta]
3571       old_lvs = dev.children
3572       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3573       info("creating new local storage on %s for %s" %
3574            (tgt_node, dev.iv_name))
3575       # since we *always* want to create this LV, we use the
3576       # _Create...OnPrimary (which forces the creation), even if we
3577       # are talking about the secondary node
3578       for new_lv in new_lvs:
3579         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3580                                         _GetInstanceInfoText(instance)):
3581           raise errors.OpExecError("Failed to create new LV named '%s' on"
3582                                    " node '%s'" %
3583                                    (new_lv.logical_id[1], tgt_node))
3584
3585     # Step: for each lv, detach+rename*2+attach
3586     self.proc.LogStep(4, steps_total, "change drbd configuration")
3587     for dev, old_lvs, new_lvs in iv_names.itervalues():
3588       info("detaching %s drbd from local storage" % dev.iv_name)
3589       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3590         raise errors.OpExecError("Can't detach drbd from local storage on node"
3591                                  " %s for device %s" % (tgt_node, dev.iv_name))
3592       #dev.children = []
3593       #cfg.Update(instance)
3594
3595       # ok, we created the new LVs, so now we know we have the needed
3596       # storage; as such, we proceed on the target node to rename
3597       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3598       # using the assumption than logical_id == physical_id (which in
3599       # turn is the unique_id on that node)
3600
3601       # FIXME(iustin): use a better name for the replaced LVs
3602       temp_suffix = int(time.time())
3603       ren_fn = lambda d, suff: (d.physical_id[0],
3604                                 d.physical_id[1] + "_replaced-%s" % suff)
3605       # build the rename list based on what LVs exist on the node
3606       rlist = []
3607       for to_ren in old_lvs:
3608         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3609         if find_res is not None: # device exists
3610           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3611
3612       info("renaming the old LVs on the target node")
3613       if not rpc.call_blockdev_rename(tgt_node, rlist):
3614         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3615       # now we rename the new LVs to the old LVs
3616       info("renaming the new LVs on the target node")
3617       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3618       if not rpc.call_blockdev_rename(tgt_node, rlist):
3619         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3620
3621       for old, new in zip(old_lvs, new_lvs):
3622         new.logical_id = old.logical_id
3623         cfg.SetDiskID(new, tgt_node)
3624
3625       for disk in old_lvs:
3626         disk.logical_id = ren_fn(disk, temp_suffix)
3627         cfg.SetDiskID(disk, tgt_node)
3628
3629       # now that the new lvs have the old name, we can add them to the device
3630       info("adding new mirror component on %s" % tgt_node)
3631       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3632         for new_lv in new_lvs:
3633           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3634             warning("Can't rollback device %s", "manually cleanup unused"
3635                     " logical volumes")
3636         raise errors.OpExecError("Can't add local storage to drbd")
3637
3638       dev.children = new_lvs
3639       cfg.Update(instance)
3640
3641     # Step: wait for sync
3642
3643     # this can fail as the old devices are degraded and _WaitForSync
3644     # does a combined result over all disks, so we don't check its
3645     # return value
3646     self.proc.LogStep(5, steps_total, "sync devices")
3647     _WaitForSync(cfg, instance, self.proc, unlock=True)
3648
3649     # so check manually all the devices
3650     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3651       cfg.SetDiskID(dev, instance.primary_node)
3652       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3653       if is_degr:
3654         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3655
3656     # Step: remove old storage
3657     self.proc.LogStep(6, steps_total, "removing old storage")
3658     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3659       info("remove logical volumes for %s" % name)
3660       for lv in old_lvs:
3661         cfg.SetDiskID(lv, tgt_node)
3662         if not rpc.call_blockdev_remove(tgt_node, lv):
3663           warning("Can't remove old LV", "manually remove unused LVs")
3664           continue
3665
3666   def _ExecD8Secondary(self, feedback_fn):
3667     """Replace the secondary node for drbd8.
3668
3669     The algorithm for replace is quite complicated:
3670       - for all disks of the instance:
3671         - create new LVs on the new node with same names
3672         - shutdown the drbd device on the old secondary
3673         - disconnect the drbd network on the primary
3674         - create the drbd device on the new secondary
3675         - network attach the drbd on the primary, using an artifice:
3676           the drbd code for Attach() will connect to the network if it
3677           finds a device which is connected to the good local disks but
3678           not network enabled
3679       - wait for sync across all devices
3680       - remove all disks from the old secondary
3681
3682     Failures are not very well handled.
3683
3684     """
3685     steps_total = 6
3686     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3687     instance = self.instance
3688     iv_names = {}
3689     vgname = self.cfg.GetVGName()
3690     # start of work
3691     cfg = self.cfg
3692     old_node = self.tgt_node
3693     new_node = self.new_node
3694     pri_node = instance.primary_node
3695
3696     # Step: check device activation
3697     self.proc.LogStep(1, steps_total, "check device existence")
3698     info("checking volume groups")
3699     my_vg = cfg.GetVGName()
3700     results = rpc.call_vg_list([pri_node, new_node])
3701     if not results:
3702       raise errors.OpExecError("Can't list volume groups on the nodes")
3703     for node in pri_node, new_node:
3704       res = results.get(node, False)
3705       if not res or my_vg not in res:
3706         raise errors.OpExecError("Volume group '%s' not found on %s" %
3707                                  (my_vg, node))
3708     for dev in instance.disks:
3709       if not dev.iv_name in self.op.disks:
3710         continue
3711       info("checking %s on %s" % (dev.iv_name, pri_node))
3712       cfg.SetDiskID(dev, pri_node)
3713       if not rpc.call_blockdev_find(pri_node, dev):
3714         raise errors.OpExecError("Can't find device %s on node %s" %
3715                                  (dev.iv_name, pri_node))
3716
3717     # Step: check other node consistency
3718     self.proc.LogStep(2, steps_total, "check peer consistency")
3719     for dev in instance.disks:
3720       if not dev.iv_name in self.op.disks:
3721         continue
3722       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3723       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3724         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3725                                  " unsafe to replace the secondary" %
3726                                  pri_node)
3727
3728     # Step: create new storage
3729     self.proc.LogStep(3, steps_total, "allocate new storage")
3730     for dev in instance.disks:
3731       size = dev.size
3732       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3733       # since we *always* want to create this LV, we use the
3734       # _Create...OnPrimary (which forces the creation), even if we
3735       # are talking about the secondary node
3736       for new_lv in dev.children:
3737         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3738                                         _GetInstanceInfoText(instance)):
3739           raise errors.OpExecError("Failed to create new LV named '%s' on"
3740                                    " node '%s'" %
3741                                    (new_lv.logical_id[1], new_node))
3742
3743       iv_names[dev.iv_name] = (dev, dev.children)
3744
3745     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3746     for dev in instance.disks:
3747       size = dev.size
3748       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3749       # create new devices on new_node
3750       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3751                               logical_id=(pri_node, new_node,
3752                                           dev.logical_id[2]),
3753                               children=dev.children)
3754       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3755                                         new_drbd, False,
3756                                       _GetInstanceInfoText(instance)):
3757         raise errors.OpExecError("Failed to create new DRBD on"
3758                                  " node '%s'" % new_node)
3759
3760     for dev in instance.disks:
3761       # we have new devices, shutdown the drbd on the old secondary
3762       info("shutting down drbd for %s on old node" % dev.iv_name)
3763       cfg.SetDiskID(dev, old_node)
3764       if not rpc.call_blockdev_shutdown(old_node, dev):
3765         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3766                 "Please cleanup this device manually as soon as possible")
3767
3768     info("detaching primary drbds from the network (=> standalone)")
3769     done = 0
3770     for dev in instance.disks:
3771       cfg.SetDiskID(dev, pri_node)
3772       # set the physical (unique in bdev terms) id to None, meaning
3773       # detach from network
3774       dev.physical_id = (None,) * len(dev.physical_id)
3775       # and 'find' the device, which will 'fix' it to match the
3776       # standalone state
3777       if rpc.call_blockdev_find(pri_node, dev):
3778         done += 1
3779       else:
3780         warning("Failed to detach drbd %s from network, unusual case" %
3781                 dev.iv_name)
3782
3783     if not done:
3784       # no detaches succeeded (very unlikely)
3785       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3786
3787     # if we managed to detach at least one, we update all the disks of
3788     # the instance to point to the new secondary
3789     info("updating instance configuration")
3790     for dev in instance.disks:
3791       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3792       cfg.SetDiskID(dev, pri_node)
3793     cfg.Update(instance)
3794
3795     # and now perform the drbd attach
3796     info("attaching primary drbds to new secondary (standalone => connected)")
3797     failures = []
3798     for dev in instance.disks:
3799       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3800       # since the attach is smart, it's enough to 'find' the device,
3801       # it will automatically activate the network, if the physical_id
3802       # is correct
3803       cfg.SetDiskID(dev, pri_node)
3804       if not rpc.call_blockdev_find(pri_node, dev):
3805         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3806                 "please do a gnt-instance info to see the status of disks")
3807
3808     # this can fail as the old devices are degraded and _WaitForSync
3809     # does a combined result over all disks, so we don't check its
3810     # return value
3811     self.proc.LogStep(5, steps_total, "sync devices")
3812     _WaitForSync(cfg, instance, self.proc, unlock=True)
3813
3814     # so check manually all the devices
3815     for name, (dev, old_lvs) in iv_names.iteritems():
3816       cfg.SetDiskID(dev, pri_node)
3817       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3818       if is_degr:
3819         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3820
3821     self.proc.LogStep(6, steps_total, "removing old storage")
3822     for name, (dev, old_lvs) in iv_names.iteritems():
3823       info("remove logical volumes for %s" % name)
3824       for lv in old_lvs:
3825         cfg.SetDiskID(lv, old_node)
3826         if not rpc.call_blockdev_remove(old_node, lv):
3827           warning("Can't remove LV on old secondary",
3828                   "Cleanup stale volumes by hand")
3829
3830   def Exec(self, feedback_fn):
3831     """Execute disk replacement.
3832
3833     This dispatches the disk replacement to the appropriate handler.
3834
3835     """
3836     instance = self.instance
3837     if instance.disk_template == constants.DT_REMOTE_RAID1:
3838       fn = self._ExecRR1
3839     elif instance.disk_template == constants.DT_DRBD8:
3840       if self.op.remote_node is None:
3841         fn = self._ExecD8DiskOnly
3842       else:
3843         fn = self._ExecD8Secondary
3844     else:
3845       raise errors.ProgrammerError("Unhandled disk replacement case")
3846     return fn(feedback_fn)
3847
3848
3849 class LUQueryInstanceData(NoHooksLU):
3850   """Query runtime instance data.
3851
3852   """
3853   _OP_REQP = ["instances"]
3854
3855   def CheckPrereq(self):
3856     """Check prerequisites.
3857
3858     This only checks the optional instance list against the existing names.
3859
3860     """
3861     if not isinstance(self.op.instances, list):
3862       raise errors.OpPrereqError("Invalid argument type 'instances'")
3863     if self.op.instances:
3864       self.wanted_instances = []
3865       names = self.op.instances
3866       for name in names:
3867         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3868         if instance is None:
3869           raise errors.OpPrereqError("No such instance name '%s'" % name)
3870       self.wanted_instances.append(instance)
3871     else:
3872       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3873                                in self.cfg.GetInstanceList()]
3874     return
3875
3876
3877   def _ComputeDiskStatus(self, instance, snode, dev):
3878     """Compute block device status.
3879
3880     """
3881     self.cfg.SetDiskID(dev, instance.primary_node)
3882     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3883     if dev.dev_type in constants.LDS_DRBD:
3884       # we change the snode then (otherwise we use the one passed in)
3885       if dev.logical_id[0] == instance.primary_node:
3886         snode = dev.logical_id[1]
3887       else:
3888         snode = dev.logical_id[0]
3889
3890     if snode:
3891       self.cfg.SetDiskID(dev, snode)
3892       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3893     else:
3894       dev_sstatus = None
3895
3896     if dev.children:
3897       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3898                       for child in dev.children]
3899     else:
3900       dev_children = []
3901
3902     data = {
3903       "iv_name": dev.iv_name,
3904       "dev_type": dev.dev_type,
3905       "logical_id": dev.logical_id,
3906       "physical_id": dev.physical_id,
3907       "pstatus": dev_pstatus,
3908       "sstatus": dev_sstatus,
3909       "children": dev_children,
3910       }
3911
3912     return data
3913
3914   def Exec(self, feedback_fn):
3915     """Gather and return data"""
3916     result = {}
3917     for instance in self.wanted_instances:
3918       remote_info = rpc.call_instance_info(instance.primary_node,
3919                                                 instance.name)
3920       if remote_info and "state" in remote_info:
3921         remote_state = "up"
3922       else:
3923         remote_state = "down"
3924       if instance.status == "down":
3925         config_state = "down"
3926       else:
3927         config_state = "up"
3928
3929       disks = [self._ComputeDiskStatus(instance, None, device)
3930                for device in instance.disks]
3931
3932       idict = {
3933         "name": instance.name,
3934         "config_state": config_state,
3935         "run_state": remote_state,
3936         "pnode": instance.primary_node,
3937         "snodes": instance.secondary_nodes,
3938         "os": instance.os,
3939         "memory": instance.memory,
3940         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3941         "disks": disks,
3942         "vcpus": instance.vcpus,
3943         }
3944
3945       result[instance.name] = idict
3946
3947     return result
3948
3949
3950 class LUSetInstanceParms(LogicalUnit):
3951   """Modifies an instances's parameters.
3952
3953   """
3954   HPATH = "instance-modify"
3955   HTYPE = constants.HTYPE_INSTANCE
3956   _OP_REQP = ["instance_name"]
3957
3958   def BuildHooksEnv(self):
3959     """Build hooks env.
3960
3961     This runs on the master, primary and secondaries.
3962
3963     """
3964     args = dict()
3965     if self.mem:
3966       args['memory'] = self.mem
3967     if self.vcpus:
3968       args['vcpus'] = self.vcpus
3969     if self.do_ip or self.do_bridge:
3970       if self.do_ip:
3971         ip = self.ip
3972       else:
3973         ip = self.instance.nics[0].ip
3974       if self.bridge:
3975         bridge = self.bridge
3976       else:
3977         bridge = self.instance.nics[0].bridge
3978       args['nics'] = [(ip, bridge)]
3979     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3980     nl = [self.sstore.GetMasterNode(),
3981           self.instance.primary_node] + list(self.instance.secondary_nodes)
3982     return env, nl, nl
3983
3984   def CheckPrereq(self):
3985     """Check prerequisites.
3986
3987     This only checks the instance list against the existing names.
3988
3989     """
3990     self.mem = getattr(self.op, "mem", None)
3991     self.vcpus = getattr(self.op, "vcpus", None)
3992     self.ip = getattr(self.op, "ip", None)
3993     self.bridge = getattr(self.op, "bridge", None)
3994     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3995       raise errors.OpPrereqError("No changes submitted")
3996     if self.mem is not None:
3997       try:
3998         self.mem = int(self.mem)
3999       except ValueError, err:
4000         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4001     if self.vcpus is not None:
4002       try:
4003         self.vcpus = int(self.vcpus)
4004       except ValueError, err:
4005         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4006     if self.ip is not None:
4007       self.do_ip = True
4008       if self.ip.lower() == "none":
4009         self.ip = None
4010       else:
4011         if not utils.IsValidIP(self.ip):
4012           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4013     else:
4014       self.do_ip = False
4015     self.do_bridge = (self.bridge is not None)
4016
4017     instance = self.cfg.GetInstanceInfo(
4018       self.cfg.ExpandInstanceName(self.op.instance_name))
4019     if instance is None:
4020       raise errors.OpPrereqError("No such instance name '%s'" %
4021                                  self.op.instance_name)
4022     self.op.instance_name = instance.name
4023     self.instance = instance
4024     return
4025
4026   def Exec(self, feedback_fn):
4027     """Modifies an instance.
4028
4029     All parameters take effect only at the next restart of the instance.
4030     """
4031     result = []
4032     instance = self.instance
4033     if self.mem:
4034       instance.memory = self.mem
4035       result.append(("mem", self.mem))
4036     if self.vcpus:
4037       instance.vcpus = self.vcpus
4038       result.append(("vcpus",  self.vcpus))
4039     if self.do_ip:
4040       instance.nics[0].ip = self.ip
4041       result.append(("ip", self.ip))
4042     if self.bridge:
4043       instance.nics[0].bridge = self.bridge
4044       result.append(("bridge", self.bridge))
4045
4046     self.cfg.AddInstance(instance)
4047
4048     return result
4049
4050
4051 class LUQueryExports(NoHooksLU):
4052   """Query the exports list
4053
4054   """
4055   _OP_REQP = []
4056
4057   def CheckPrereq(self):
4058     """Check that the nodelist contains only existing nodes.
4059
4060     """
4061     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4062
4063   def Exec(self, feedback_fn):
4064     """Compute the list of all the exported system images.
4065
4066     Returns:
4067       a dictionary with the structure node->(export-list)
4068       where export-list is a list of the instances exported on
4069       that node.
4070
4071     """
4072     return rpc.call_export_list(self.nodes)
4073
4074
4075 class LUExportInstance(LogicalUnit):
4076   """Export an instance to an image in the cluster.
4077
4078   """
4079   HPATH = "instance-export"
4080   HTYPE = constants.HTYPE_INSTANCE
4081   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4082
4083   def BuildHooksEnv(self):
4084     """Build hooks env.
4085
4086     This will run on the master, primary node and target node.
4087
4088     """
4089     env = {
4090       "EXPORT_NODE": self.op.target_node,
4091       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4092       }
4093     env.update(_BuildInstanceHookEnvByObject(self.instance))
4094     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4095           self.op.target_node]
4096     return env, nl, nl
4097
4098   def CheckPrereq(self):
4099     """Check prerequisites.
4100
4101     This checks that the instance name is a valid one.
4102
4103     """
4104     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4105     self.instance = self.cfg.GetInstanceInfo(instance_name)
4106     if self.instance is None:
4107       raise errors.OpPrereqError("Instance '%s' not found" %
4108                                  self.op.instance_name)
4109
4110     # node verification
4111     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4112     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4113
4114     if self.dst_node is None:
4115       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4116                                  self.op.target_node)
4117     self.op.target_node = self.dst_node.name
4118
4119   def Exec(self, feedback_fn):
4120     """Export an instance to an image in the cluster.
4121
4122     """
4123     instance = self.instance
4124     dst_node = self.dst_node
4125     src_node = instance.primary_node
4126     # shutdown the instance, unless requested not to do so
4127     if self.op.shutdown:
4128       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4129       self.proc.ChainOpCode(op)
4130
4131     vgname = self.cfg.GetVGName()
4132
4133     snap_disks = []
4134
4135     try:
4136       for disk in instance.disks:
4137         if disk.iv_name == "sda":
4138           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4139           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4140
4141           if not new_dev_name:
4142             logger.Error("could not snapshot block device %s on node %s" %
4143                          (disk.logical_id[1], src_node))
4144           else:
4145             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4146                                       logical_id=(vgname, new_dev_name),
4147                                       physical_id=(vgname, new_dev_name),
4148                                       iv_name=disk.iv_name)
4149             snap_disks.append(new_dev)
4150
4151     finally:
4152       if self.op.shutdown:
4153         op = opcodes.OpStartupInstance(instance_name=instance.name,
4154                                        force=False)
4155         self.proc.ChainOpCode(op)
4156
4157     # TODO: check for size
4158
4159     for dev in snap_disks:
4160       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4161                                            instance):
4162         logger.Error("could not export block device %s from node"
4163                      " %s to node %s" %
4164                      (dev.logical_id[1], src_node, dst_node.name))
4165       if not rpc.call_blockdev_remove(src_node, dev):
4166         logger.Error("could not remove snapshot block device %s from"
4167                      " node %s" % (dev.logical_id[1], src_node))
4168
4169     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4170       logger.Error("could not finalize export for instance %s on node %s" %
4171                    (instance.name, dst_node.name))
4172
4173     nodelist = self.cfg.GetNodeList()
4174     nodelist.remove(dst_node.name)
4175
4176     # on one-node clusters nodelist will be empty after the removal
4177     # if we proceed the backup would be removed because OpQueryExports
4178     # substitutes an empty list with the full cluster node list.
4179     if nodelist:
4180       op = opcodes.OpQueryExports(nodes=nodelist)
4181       exportlist = self.proc.ChainOpCode(op)
4182       for node in exportlist:
4183         if instance.name in exportlist[node]:
4184           if not rpc.call_export_remove(node, instance.name):
4185             logger.Error("could not remove older export for instance %s"
4186                          " on node %s" % (instance.name, node))
4187
4188
4189 class TagsLU(NoHooksLU):
4190   """Generic tags LU.
4191
4192   This is an abstract class which is the parent of all the other tags LUs.
4193
4194   """
4195   def CheckPrereq(self):
4196     """Check prerequisites.
4197
4198     """
4199     if self.op.kind == constants.TAG_CLUSTER:
4200       self.target = self.cfg.GetClusterInfo()
4201     elif self.op.kind == constants.TAG_NODE:
4202       name = self.cfg.ExpandNodeName(self.op.name)
4203       if name is None:
4204         raise errors.OpPrereqError("Invalid node name (%s)" %
4205                                    (self.op.name,))
4206       self.op.name = name
4207       self.target = self.cfg.GetNodeInfo(name)
4208     elif self.op.kind == constants.TAG_INSTANCE:
4209       name = self.cfg.ExpandInstanceName(self.op.name)
4210       if name is None:
4211         raise errors.OpPrereqError("Invalid instance name (%s)" %
4212                                    (self.op.name,))
4213       self.op.name = name
4214       self.target = self.cfg.GetInstanceInfo(name)
4215     else:
4216       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4217                                  str(self.op.kind))
4218
4219
4220 class LUGetTags(TagsLU):
4221   """Returns the tags of a given object.
4222
4223   """
4224   _OP_REQP = ["kind", "name"]
4225
4226   def Exec(self, feedback_fn):
4227     """Returns the tag list.
4228
4229     """
4230     return self.target.GetTags()
4231
4232
4233 class LUSearchTags(NoHooksLU):
4234   """Searches the tags for a given pattern.
4235
4236   """
4237   _OP_REQP = ["pattern"]
4238
4239   def CheckPrereq(self):
4240     """Check prerequisites.
4241
4242     This checks the pattern passed for validity by compiling it.
4243
4244     """
4245     try:
4246       self.re = re.compile(self.op.pattern)
4247     except re.error, err:
4248       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4249                                  (self.op.pattern, err))
4250
4251   def Exec(self, feedback_fn):
4252     """Returns the tag list.
4253
4254     """
4255     cfg = self.cfg
4256     tgts = [("/cluster", cfg.GetClusterInfo())]
4257     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4258     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4259     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4260     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4261     results = []
4262     for path, target in tgts:
4263       for tag in target.GetTags():
4264         if self.re.search(tag):
4265           results.append((path, tag))
4266     return results
4267
4268
4269 class LUAddTags(TagsLU):
4270   """Sets a tag on a given object.
4271
4272   """
4273   _OP_REQP = ["kind", "name", "tags"]
4274
4275   def CheckPrereq(self):
4276     """Check prerequisites.
4277
4278     This checks the type and length of the tag name and value.
4279
4280     """
4281     TagsLU.CheckPrereq(self)
4282     for tag in self.op.tags:
4283       objects.TaggableObject.ValidateTag(tag)
4284
4285   def Exec(self, feedback_fn):
4286     """Sets the tag.
4287
4288     """
4289     try:
4290       for tag in self.op.tags:
4291         self.target.AddTag(tag)
4292     except errors.TagError, err:
4293       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4294     try:
4295       self.cfg.Update(self.target)
4296     except errors.ConfigurationError:
4297       raise errors.OpRetryError("There has been a modification to the"
4298                                 " config file and the operation has been"
4299                                 " aborted. Please retry.")
4300
4301
4302 class LUDelTags(TagsLU):
4303   """Delete a list of tags from a given object.
4304
4305   """
4306   _OP_REQP = ["kind", "name", "tags"]
4307
4308   def CheckPrereq(self):
4309     """Check prerequisites.
4310
4311     This checks that we have the given tag.
4312
4313     """
4314     TagsLU.CheckPrereq(self)
4315     for tag in self.op.tags:
4316       objects.TaggableObject.ValidateTag(tag)
4317     del_tags = frozenset(self.op.tags)
4318     cur_tags = self.target.GetTags()
4319     if not del_tags <= cur_tags:
4320       diff_tags = del_tags - cur_tags
4321       diff_names = ["'%s'" % tag for tag in diff_tags]
4322       diff_names.sort()
4323       raise errors.OpPrereqError("Tag(s) %s not found" %
4324                                  (",".join(diff_names)))
4325
4326   def Exec(self, feedback_fn):
4327     """Remove the tag from the object.
4328
4329     """
4330     for tag in self.op.tags:
4331       self.target.RemoveTag(tag)
4332     try:
4333       self.cfg.Update(self.target)
4334     except errors.ConfigurationError:
4335       raise errors.OpRetryError("There has been a modification to the"
4336                                 " config file and the operation has been"
4337                                 " aborted. Please retry.")