Fix a typo in a warning message
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275   else:
276     nic_count = 0
277
278   env["INSTANCE_NIC_COUNT"] = nic_count
279
280   return env
281
282
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284   """Builds instance related env variables for hooks from an object.
285
286   Args:
287     instance: objects.Instance object of instance
288     override: dict of values to override
289   """
290   args = {
291     'name': instance.name,
292     'primary_node': instance.primary_node,
293     'secondary_nodes': instance.secondary_nodes,
294     'os_type': instance.os,
295     'status': instance.os,
296     'memory': instance.memory,
297     'vcpus': instance.vcpus,
298     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299   }
300   if override:
301     args.update(override)
302   return _BuildInstanceHookEnv(**args)
303
304
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306   """Ensure a node has a correct known_hosts entry.
307
308   Args:
309     fullnode - Fully qualified domain name of host. (str)
310     ip       - IPv4 address of host (str)
311     pubkey   - the public key of the cluster
312
313   """
314   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316   else:
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318
319   inthere = False
320
321   save_lines = []
322   add_lines = []
323   removed = False
324
325   for rawline in f:
326     logger.Debug('read %s' % (repr(rawline),))
327
328     parts = rawline.rstrip('\r\n').split()
329
330     # Ignore unwanted lines
331     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332       fields = parts[0].split(',')
333       key = parts[2]
334
335       haveall = True
336       havesome = False
337       for spec in [ ip, fullnode ]:
338         if spec not in fields:
339           haveall = False
340         if spec in fields:
341           havesome = True
342
343       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344       if haveall and key == pubkey:
345         inthere = True
346         save_lines.append(rawline)
347         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348         continue
349
350       if havesome and (not haveall or key != pubkey):
351         removed = True
352         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353         continue
354
355     save_lines.append(rawline)
356
357   if not inthere:
358     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360
361   if removed:
362     save_lines = save_lines + add_lines
363
364     # Write a new file and replace old.
365     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366                                    constants.DATA_DIR)
367     newfile = os.fdopen(fd, 'w')
368     try:
369       newfile.write(''.join(save_lines))
370     finally:
371       newfile.close()
372     logger.Debug("Wrote new known_hosts.")
373     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374
375   elif add_lines:
376     # Simply appending a new line will do the trick.
377     f.seek(0, 2)
378     for add in add_lines:
379       f.write(add)
380
381   f.close()
382
383
384 def _HasValidVG(vglist, vgname):
385   """Checks if the volume group list is valid.
386
387   A non-None return value means there's an error, and the return value
388   is the error message.
389
390   """
391   vgsize = vglist.get(vgname, None)
392   if vgsize is None:
393     return "volume group '%s' missing" % vgname
394   elif vgsize < 20480:
395     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396             (vgname, vgsize))
397   return None
398
399
400 def _InitSSHSetup(node):
401   """Setup the SSH configuration for the cluster.
402
403
404   This generates a dsa keypair for root, adds the pub key to the
405   permitted hosts and adds the hostkey to its own known hosts.
406
407   Args:
408     node: the name of this host as a fqdn
409
410   """
411   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412
413   for name in priv_key, pub_key:
414     if os.path.exists(name):
415       utils.CreateBackup(name)
416     utils.RemoveFile(name)
417
418   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419                          "-f", priv_key,
420                          "-q", "-N", ""])
421   if result.failed:
422     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423                              result.output)
424
425   f = open(pub_key, 'r')
426   try:
427     utils.AddAuthorizedKey(auth_keys, f.read(8192))
428   finally:
429     f.close()
430
431
432 def _InitGanetiServerSetup(ss):
433   """Setup the necessary configuration for the initial node daemon.
434
435   This creates the nodepass file containing the shared password for
436   the cluster and also generates the SSL certificate.
437
438   """
439   # Create pseudo random password
440   randpass = sha.new(os.urandom(64)).hexdigest()
441   # and write it into sstore
442   ss.SetKey(ss.SS_NODED_PASS, randpass)
443
444   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445                          "-days", str(365*5), "-nodes", "-x509",
446                          "-keyout", constants.SSL_CERT_FILE,
447                          "-out", constants.SSL_CERT_FILE, "-batch"])
448   if result.failed:
449     raise errors.OpExecError("could not generate server ssl cert, command"
450                              " %s had exitcode %s and error message %s" %
451                              (result.cmd, result.exit_code, result.output))
452
453   os.chmod(constants.SSL_CERT_FILE, 0400)
454
455   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456
457   if result.failed:
458     raise errors.OpExecError("Could not start the node daemon, command %s"
459                              " had exitcode %s and error %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462
463 def _CheckInstanceBridgesExist(instance):
464   """Check that the brigdes needed by an instance exist.
465
466   """
467   # check bridges existance
468   brlist = [nic.bridge for nic in instance.nics]
469   if not rpc.call_bridges_exist(instance.primary_node, brlist):
470     raise errors.OpPrereqError("one or more target bridges %s does not"
471                                " exist on destination node '%s'" %
472                                (brlist, instance.primary_node))
473
474
475 class LUInitCluster(LogicalUnit):
476   """Initialise the cluster.
477
478   """
479   HPATH = "cluster-init"
480   HTYPE = constants.HTYPE_CLUSTER
481   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482               "def_bridge", "master_netdev"]
483   REQ_CLUSTER = False
484
485   def BuildHooksEnv(self):
486     """Build hooks env.
487
488     Notes: Since we don't require a cluster, we must manually add
489     ourselves in the post-run node list.
490
491     """
492     env = {"OP_TARGET": self.op.cluster_name}
493     return env, [], [self.hostname.name]
494
495   def CheckPrereq(self):
496     """Verify that the passed name is a valid one.
497
498     """
499     if config.ConfigWriter.IsCluster():
500       raise errors.OpPrereqError("Cluster is already initialised")
501
502     self.hostname = hostname = utils.HostInfo()
503
504     if hostname.ip.startswith("127."):
505       raise errors.OpPrereqError("This host's IP resolves to the private"
506                                  " range (%s). Please fix DNS or /etc/hosts." %
507                                  (hostname.ip,))
508
509     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
510
511     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
512                          constants.DEFAULT_NODED_PORT):
513       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
514                                  " to %s,\nbut this ip address does not"
515                                  " belong to this host."
516                                  " Aborting." % hostname.ip)
517
518     secondary_ip = getattr(self.op, "secondary_ip", None)
519     if secondary_ip and not utils.IsValidIP(secondary_ip):
520       raise errors.OpPrereqError("Invalid secondary ip given")
521     if (secondary_ip and
522         secondary_ip != hostname.ip and
523         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
524                            constants.DEFAULT_NODED_PORT))):
525       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
526                                  "but it does not belong to this host." %
527                                  secondary_ip)
528     self.secondary_ip = secondary_ip
529
530     # checks presence of the volume group given
531     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
532
533     if vgstatus:
534       raise errors.OpPrereqError("Error: %s" % vgstatus)
535
536     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
537                     self.op.mac_prefix):
538       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
539                                  self.op.mac_prefix)
540
541     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
542       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
543                                  self.op.hypervisor_type)
544
545     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
546     if result.failed:
547       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
548                                  (self.op.master_netdev,
549                                   result.output.strip()))
550
551     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
552             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
553       raise errors.OpPrereqError("Init.d script '%s' missing or not "
554                                  "executable." % constants.NODE_INITD_SCRIPT)
555
556   def Exec(self, feedback_fn):
557     """Initialize the cluster.
558
559     """
560     clustername = self.clustername
561     hostname = self.hostname
562
563     # set up the simple store
564     self.sstore = ss = ssconf.SimpleStore()
565     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
566     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
567     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
568     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
569     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
570
571     # set up the inter-node password and certificate
572     _InitGanetiServerSetup(ss)
573
574     # start the master ip
575     rpc.call_node_start_master(hostname.name)
576
577     # set up ssh config and /etc/hosts
578     f = open(constants.SSH_HOST_RSA_PUB, 'r')
579     try:
580       sshline = f.read()
581     finally:
582       f.close()
583     sshkey = sshline.split(" ")[1]
584
585     _AddHostToEtcHosts(hostname.name)
586
587     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
588
589     _InitSSHSetup(hostname.name)
590
591     # init of cluster config file
592     self.cfg = cfgw = config.ConfigWriter()
593     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
594                     sshkey, self.op.mac_prefix,
595                     self.op.vg_name, self.op.def_bridge)
596
597
598 class LUDestroyCluster(NoHooksLU):
599   """Logical unit for destroying the cluster.
600
601   """
602   _OP_REQP = []
603
604   def CheckPrereq(self):
605     """Check prerequisites.
606
607     This checks whether the cluster is empty.
608
609     Any errors are signalled by raising errors.OpPrereqError.
610
611     """
612     master = self.sstore.GetMasterNode()
613
614     nodelist = self.cfg.GetNodeList()
615     if len(nodelist) != 1 or nodelist[0] != master:
616       raise errors.OpPrereqError("There are still %d node(s) in"
617                                  " this cluster." % (len(nodelist) - 1))
618     instancelist = self.cfg.GetInstanceList()
619     if instancelist:
620       raise errors.OpPrereqError("There are still %d instance(s) in"
621                                  " this cluster." % len(instancelist))
622
623   def Exec(self, feedback_fn):
624     """Destroys the cluster.
625
626     """
627     master = self.sstore.GetMasterNode()
628     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
629     utils.CreateBackup(priv_key)
630     utils.CreateBackup(pub_key)
631     rpc.call_node_leave_cluster(master)
632
633
634 class LUVerifyCluster(NoHooksLU):
635   """Verifies the cluster status.
636
637   """
638   _OP_REQP = []
639
640   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
641                   remote_version, feedback_fn):
642     """Run multiple tests against a node.
643
644     Test list:
645       - compares ganeti version
646       - checks vg existance and size > 20G
647       - checks config file checksum
648       - checks ssh to other nodes
649
650     Args:
651       node: name of the node to check
652       file_list: required list of files
653       local_cksum: dictionary of local files and their checksums
654
655     """
656     # compares ganeti version
657     local_version = constants.PROTOCOL_VERSION
658     if not remote_version:
659       feedback_fn(" - ERROR: connection to %s failed" % (node))
660       return True
661
662     if local_version != remote_version:
663       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
664                       (local_version, node, remote_version))
665       return True
666
667     # checks vg existance and size > 20G
668
669     bad = False
670     if not vglist:
671       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
672                       (node,))
673       bad = True
674     else:
675       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
676       if vgstatus:
677         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
678         bad = True
679
680     # checks config file checksum
681     # checks ssh to any
682
683     if 'filelist' not in node_result:
684       bad = True
685       feedback_fn("  - ERROR: node hasn't returned file checksum data")
686     else:
687       remote_cksum = node_result['filelist']
688       for file_name in file_list:
689         if file_name not in remote_cksum:
690           bad = True
691           feedback_fn("  - ERROR: file '%s' missing" % file_name)
692         elif remote_cksum[file_name] != local_cksum[file_name]:
693           bad = True
694           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
695
696     if 'nodelist' not in node_result:
697       bad = True
698       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
699     else:
700       if node_result['nodelist']:
701         bad = True
702         for node in node_result['nodelist']:
703           feedback_fn("  - ERROR: communication with node '%s': %s" %
704                           (node, node_result['nodelist'][node]))
705     hyp_result = node_result.get('hypervisor', None)
706     if hyp_result is not None:
707       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
708     return bad
709
710   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
711     """Verify an instance.
712
713     This function checks to see if the required block devices are
714     available on the instance's node.
715
716     """
717     bad = False
718
719     instancelist = self.cfg.GetInstanceList()
720     if not instance in instancelist:
721       feedback_fn("  - ERROR: instance %s not in instance list %s" %
722                       (instance, instancelist))
723       bad = True
724
725     instanceconfig = self.cfg.GetInstanceInfo(instance)
726     node_current = instanceconfig.primary_node
727
728     node_vol_should = {}
729     instanceconfig.MapLVsByNode(node_vol_should)
730
731     for node in node_vol_should:
732       for volume in node_vol_should[node]:
733         if node not in node_vol_is or volume not in node_vol_is[node]:
734           feedback_fn("  - ERROR: volume %s missing on node %s" %
735                           (volume, node))
736           bad = True
737
738     if not instanceconfig.status == 'down':
739       if not instance in node_instance[node_current]:
740         feedback_fn("  - ERROR: instance %s not running on node %s" %
741                         (instance, node_current))
742         bad = True
743
744     for node in node_instance:
745       if (not node == node_current):
746         if instance in node_instance[node]:
747           feedback_fn("  - ERROR: instance %s should not run on node %s" %
748                           (instance, node))
749           bad = True
750
751     return bad
752
753   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
754     """Verify if there are any unknown volumes in the cluster.
755
756     The .os, .swap and backup volumes are ignored. All other volumes are
757     reported as unknown.
758
759     """
760     bad = False
761
762     for node in node_vol_is:
763       for volume in node_vol_is[node]:
764         if node not in node_vol_should or volume not in node_vol_should[node]:
765           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
766                       (volume, node))
767           bad = True
768     return bad
769
770   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
771     """Verify the list of running instances.
772
773     This checks what instances are running but unknown to the cluster.
774
775     """
776     bad = False
777     for node in node_instance:
778       for runninginstance in node_instance[node]:
779         if runninginstance not in instancelist:
780           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
781                           (runninginstance, node))
782           bad = True
783     return bad
784
785   def CheckPrereq(self):
786     """Check prerequisites.
787
788     This has no prerequisites.
789
790     """
791     pass
792
793   def Exec(self, feedback_fn):
794     """Verify integrity of cluster, performing various test on nodes.
795
796     """
797     bad = False
798     feedback_fn("* Verifying global settings")
799     self.cfg.VerifyConfig()
800
801     vg_name = self.cfg.GetVGName()
802     nodelist = utils.NiceSort(self.cfg.GetNodeList())
803     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
804     node_volume = {}
805     node_instance = {}
806
807     # FIXME: verify OS list
808     # do local checksums
809     file_names = list(self.sstore.GetFileList())
810     file_names.append(constants.SSL_CERT_FILE)
811     file_names.append(constants.CLUSTER_CONF_FILE)
812     local_checksums = utils.FingerprintFiles(file_names)
813
814     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
815     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
816     all_instanceinfo = rpc.call_instance_list(nodelist)
817     all_vglist = rpc.call_vg_list(nodelist)
818     node_verify_param = {
819       'filelist': file_names,
820       'nodelist': nodelist,
821       'hypervisor': None,
822       }
823     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
824     all_rversion = rpc.call_version(nodelist)
825
826     for node in nodelist:
827       feedback_fn("* Verifying node %s" % node)
828       result = self._VerifyNode(node, file_names, local_checksums,
829                                 all_vglist[node], all_nvinfo[node],
830                                 all_rversion[node], feedback_fn)
831       bad = bad or result
832
833       # node_volume
834       volumeinfo = all_volumeinfo[node]
835
836       if type(volumeinfo) != dict:
837         feedback_fn("  - ERROR: connection to %s failed" % (node,))
838         bad = True
839         continue
840
841       node_volume[node] = volumeinfo
842
843       # node_instance
844       nodeinstance = all_instanceinfo[node]
845       if type(nodeinstance) != list:
846         feedback_fn("  - ERROR: connection to %s failed" % (node,))
847         bad = True
848         continue
849
850       node_instance[node] = nodeinstance
851
852     node_vol_should = {}
853
854     for instance in instancelist:
855       feedback_fn("* Verifying instance %s" % instance)
856       result =  self._VerifyInstance(instance, node_volume, node_instance,
857                                      feedback_fn)
858       bad = bad or result
859
860       inst_config = self.cfg.GetInstanceInfo(instance)
861
862       inst_config.MapLVsByNode(node_vol_should)
863
864     feedback_fn("* Verifying orphan volumes")
865     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
866                                        feedback_fn)
867     bad = bad or result
868
869     feedback_fn("* Verifying remaining instances")
870     result = self._VerifyOrphanInstances(instancelist, node_instance,
871                                          feedback_fn)
872     bad = bad or result
873
874     return int(bad)
875
876
877 class LURenameCluster(LogicalUnit):
878   """Rename the cluster.
879
880   """
881   HPATH = "cluster-rename"
882   HTYPE = constants.HTYPE_CLUSTER
883   _OP_REQP = ["name"]
884
885   def BuildHooksEnv(self):
886     """Build hooks env.
887
888     """
889     env = {
890       "OP_TARGET": self.op.sstore.GetClusterName(),
891       "NEW_NAME": self.op.name,
892       }
893     mn = self.sstore.GetMasterNode()
894     return env, [mn], [mn]
895
896   def CheckPrereq(self):
897     """Verify that the passed name is a valid one.
898
899     """
900     hostname = utils.HostInfo(self.op.name)
901
902     new_name = hostname.name
903     self.ip = new_ip = hostname.ip
904     old_name = self.sstore.GetClusterName()
905     old_ip = self.sstore.GetMasterIP()
906     if new_name == old_name and new_ip == old_ip:
907       raise errors.OpPrereqError("Neither the name nor the IP address of the"
908                                  " cluster has changed")
909     if new_ip != old_ip:
910       result = utils.RunCmd(["fping", "-q", new_ip])
911       if not result.failed:
912         raise errors.OpPrereqError("The given cluster IP address (%s) is"
913                                    " reachable on the network. Aborting." %
914                                    new_ip)
915
916     self.op.name = new_name
917
918   def Exec(self, feedback_fn):
919     """Rename the cluster.
920
921     """
922     clustername = self.op.name
923     ip = self.ip
924     ss = self.sstore
925
926     # shutdown the master IP
927     master = ss.GetMasterNode()
928     if not rpc.call_node_stop_master(master):
929       raise errors.OpExecError("Could not disable the master role")
930
931     try:
932       # modify the sstore
933       ss.SetKey(ss.SS_MASTER_IP, ip)
934       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
935
936       # Distribute updated ss config to all nodes
937       myself = self.cfg.GetNodeInfo(master)
938       dist_nodes = self.cfg.GetNodeList()
939       if myself.name in dist_nodes:
940         dist_nodes.remove(myself.name)
941
942       logger.Debug("Copying updated ssconf data to all nodes")
943       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
944         fname = ss.KeyToFilename(keyname)
945         result = rpc.call_upload_file(dist_nodes, fname)
946         for to_node in dist_nodes:
947           if not result[to_node]:
948             logger.Error("copy of file %s to node %s failed" %
949                          (fname, to_node))
950     finally:
951       if not rpc.call_node_start_master(master):
952         logger.Error("Could not re-enable the master role on the master,\n"
953                      "please restart manually.")
954
955
956 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
957   """Sleep and poll for an instance's disk to sync.
958
959   """
960   if not instance.disks:
961     return True
962
963   if not oneshot:
964     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
965
966   node = instance.primary_node
967
968   for dev in instance.disks:
969     cfgw.SetDiskID(dev, node)
970
971   retries = 0
972   while True:
973     max_time = 0
974     done = True
975     cumul_degraded = False
976     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
977     if not rstats:
978       proc.LogWarning("Can't get any data from node %s" % node)
979       retries += 1
980       if retries >= 10:
981         raise errors.RemoteError("Can't contact node %s for mirror data,"
982                                  " aborting." % node)
983       time.sleep(6)
984       continue
985     retries = 0
986     for i in range(len(rstats)):
987       mstat = rstats[i]
988       if mstat is None:
989         proc.LogWarning("Can't compute data for node %s/%s" %
990                         (node, instance.disks[i].iv_name))
991         continue
992       # we ignore the ldisk parameter
993       perc_done, est_time, is_degraded, _ = mstat
994       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
995       if perc_done is not None:
996         done = False
997         if est_time is not None:
998           rem_time = "%d estimated seconds remaining" % est_time
999           max_time = est_time
1000         else:
1001           rem_time = "no time estimate"
1002         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1003                      (instance.disks[i].iv_name, perc_done, rem_time))
1004     if done or oneshot:
1005       break
1006
1007     if unlock:
1008       utils.Unlock('cmd')
1009     try:
1010       time.sleep(min(60, max_time))
1011     finally:
1012       if unlock:
1013         utils.Lock('cmd')
1014
1015   if done:
1016     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1017   return not cumul_degraded
1018
1019
1020 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1021   """Check that mirrors are not degraded.
1022
1023   The ldisk parameter, if True, will change the test from the
1024   is_degraded attribute (which represents overall non-ok status for
1025   the device(s)) to the ldisk (representing the local storage status).
1026
1027   """
1028   cfgw.SetDiskID(dev, node)
1029   if ldisk:
1030     idx = 6
1031   else:
1032     idx = 5
1033
1034   result = True
1035   if on_primary or dev.AssembleOnSecondary():
1036     rstats = rpc.call_blockdev_find(node, dev)
1037     if not rstats:
1038       logger.ToStderr("Can't get any data from node %s" % node)
1039       result = False
1040     else:
1041       result = result and (not rstats[idx])
1042   if dev.children:
1043     for child in dev.children:
1044       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1045
1046   return result
1047
1048
1049 class LUDiagnoseOS(NoHooksLU):
1050   """Logical unit for OS diagnose/query.
1051
1052   """
1053   _OP_REQP = []
1054
1055   def CheckPrereq(self):
1056     """Check prerequisites.
1057
1058     This always succeeds, since this is a pure query LU.
1059
1060     """
1061     return
1062
1063   def Exec(self, feedback_fn):
1064     """Compute the list of OSes.
1065
1066     """
1067     node_list = self.cfg.GetNodeList()
1068     node_data = rpc.call_os_diagnose(node_list)
1069     if node_data == False:
1070       raise errors.OpExecError("Can't gather the list of OSes")
1071     return node_data
1072
1073
1074 class LURemoveNode(LogicalUnit):
1075   """Logical unit for removing a node.
1076
1077   """
1078   HPATH = "node-remove"
1079   HTYPE = constants.HTYPE_NODE
1080   _OP_REQP = ["node_name"]
1081
1082   def BuildHooksEnv(self):
1083     """Build hooks env.
1084
1085     This doesn't run on the target node in the pre phase as a failed
1086     node would not allows itself to run.
1087
1088     """
1089     env = {
1090       "OP_TARGET": self.op.node_name,
1091       "NODE_NAME": self.op.node_name,
1092       }
1093     all_nodes = self.cfg.GetNodeList()
1094     all_nodes.remove(self.op.node_name)
1095     return env, all_nodes, all_nodes
1096
1097   def CheckPrereq(self):
1098     """Check prerequisites.
1099
1100     This checks:
1101      - the node exists in the configuration
1102      - it does not have primary or secondary instances
1103      - it's not the master
1104
1105     Any errors are signalled by raising errors.OpPrereqError.
1106
1107     """
1108     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1109     if node is None:
1110       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1111
1112     instance_list = self.cfg.GetInstanceList()
1113
1114     masternode = self.sstore.GetMasterNode()
1115     if node.name == masternode:
1116       raise errors.OpPrereqError("Node is the master node,"
1117                                  " you need to failover first.")
1118
1119     for instance_name in instance_list:
1120       instance = self.cfg.GetInstanceInfo(instance_name)
1121       if node.name == instance.primary_node:
1122         raise errors.OpPrereqError("Instance %s still running on the node,"
1123                                    " please remove first." % instance_name)
1124       if node.name in instance.secondary_nodes:
1125         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1126                                    " please remove first." % instance_name)
1127     self.op.node_name = node.name
1128     self.node = node
1129
1130   def Exec(self, feedback_fn):
1131     """Removes the node from the cluster.
1132
1133     """
1134     node = self.node
1135     logger.Info("stopping the node daemon and removing configs from node %s" %
1136                 node.name)
1137
1138     rpc.call_node_leave_cluster(node.name)
1139
1140     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1141
1142     logger.Info("Removing node %s from config" % node.name)
1143
1144     self.cfg.RemoveNode(node.name)
1145
1146     _RemoveHostFromEtcHosts(node.name)
1147
1148
1149 class LUQueryNodes(NoHooksLU):
1150   """Logical unit for querying nodes.
1151
1152   """
1153   _OP_REQP = ["output_fields", "names"]
1154
1155   def CheckPrereq(self):
1156     """Check prerequisites.
1157
1158     This checks that the fields required are valid output fields.
1159
1160     """
1161     self.dynamic_fields = frozenset(["dtotal", "dfree",
1162                                      "mtotal", "mnode", "mfree",
1163                                      "bootid"])
1164
1165     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1166                                "pinst_list", "sinst_list",
1167                                "pip", "sip"],
1168                        dynamic=self.dynamic_fields,
1169                        selected=self.op.output_fields)
1170
1171     self.wanted = _GetWantedNodes(self, self.op.names)
1172
1173   def Exec(self, feedback_fn):
1174     """Computes the list of nodes and their attributes.
1175
1176     """
1177     nodenames = self.wanted
1178     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1179
1180     # begin data gathering
1181
1182     if self.dynamic_fields.intersection(self.op.output_fields):
1183       live_data = {}
1184       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1185       for name in nodenames:
1186         nodeinfo = node_data.get(name, None)
1187         if nodeinfo:
1188           live_data[name] = {
1189             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1190             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1191             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1192             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1193             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1194             "bootid": nodeinfo['bootid'],
1195             }
1196         else:
1197           live_data[name] = {}
1198     else:
1199       live_data = dict.fromkeys(nodenames, {})
1200
1201     node_to_primary = dict([(name, set()) for name in nodenames])
1202     node_to_secondary = dict([(name, set()) for name in nodenames])
1203
1204     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1205                              "sinst_cnt", "sinst_list"))
1206     if inst_fields & frozenset(self.op.output_fields):
1207       instancelist = self.cfg.GetInstanceList()
1208
1209       for instance_name in instancelist:
1210         inst = self.cfg.GetInstanceInfo(instance_name)
1211         if inst.primary_node in node_to_primary:
1212           node_to_primary[inst.primary_node].add(inst.name)
1213         for secnode in inst.secondary_nodes:
1214           if secnode in node_to_secondary:
1215             node_to_secondary[secnode].add(inst.name)
1216
1217     # end data gathering
1218
1219     output = []
1220     for node in nodelist:
1221       node_output = []
1222       for field in self.op.output_fields:
1223         if field == "name":
1224           val = node.name
1225         elif field == "pinst_list":
1226           val = list(node_to_primary[node.name])
1227         elif field == "sinst_list":
1228           val = list(node_to_secondary[node.name])
1229         elif field == "pinst_cnt":
1230           val = len(node_to_primary[node.name])
1231         elif field == "sinst_cnt":
1232           val = len(node_to_secondary[node.name])
1233         elif field == "pip":
1234           val = node.primary_ip
1235         elif field == "sip":
1236           val = node.secondary_ip
1237         elif field in self.dynamic_fields:
1238           val = live_data[node.name].get(field, None)
1239         else:
1240           raise errors.ParameterError(field)
1241         node_output.append(val)
1242       output.append(node_output)
1243
1244     return output
1245
1246
1247 class LUQueryNodeVolumes(NoHooksLU):
1248   """Logical unit for getting volumes on node(s).
1249
1250   """
1251   _OP_REQP = ["nodes", "output_fields"]
1252
1253   def CheckPrereq(self):
1254     """Check prerequisites.
1255
1256     This checks that the fields required are valid output fields.
1257
1258     """
1259     self.nodes = _GetWantedNodes(self, self.op.nodes)
1260
1261     _CheckOutputFields(static=["node"],
1262                        dynamic=["phys", "vg", "name", "size", "instance"],
1263                        selected=self.op.output_fields)
1264
1265
1266   def Exec(self, feedback_fn):
1267     """Computes the list of nodes and their attributes.
1268
1269     """
1270     nodenames = self.nodes
1271     volumes = rpc.call_node_volumes(nodenames)
1272
1273     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1274              in self.cfg.GetInstanceList()]
1275
1276     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1277
1278     output = []
1279     for node in nodenames:
1280       if node not in volumes or not volumes[node]:
1281         continue
1282
1283       node_vols = volumes[node][:]
1284       node_vols.sort(key=lambda vol: vol['dev'])
1285
1286       for vol in node_vols:
1287         node_output = []
1288         for field in self.op.output_fields:
1289           if field == "node":
1290             val = node
1291           elif field == "phys":
1292             val = vol['dev']
1293           elif field == "vg":
1294             val = vol['vg']
1295           elif field == "name":
1296             val = vol['name']
1297           elif field == "size":
1298             val = int(float(vol['size']))
1299           elif field == "instance":
1300             for inst in ilist:
1301               if node not in lv_by_node[inst]:
1302                 continue
1303               if vol['name'] in lv_by_node[inst][node]:
1304                 val = inst.name
1305                 break
1306             else:
1307               val = '-'
1308           else:
1309             raise errors.ParameterError(field)
1310           node_output.append(str(val))
1311
1312         output.append(node_output)
1313
1314     return output
1315
1316
1317 class LUAddNode(LogicalUnit):
1318   """Logical unit for adding node to the cluster.
1319
1320   """
1321   HPATH = "node-add"
1322   HTYPE = constants.HTYPE_NODE
1323   _OP_REQP = ["node_name"]
1324
1325   def BuildHooksEnv(self):
1326     """Build hooks env.
1327
1328     This will run on all nodes before, and on all nodes + the new node after.
1329
1330     """
1331     env = {
1332       "OP_TARGET": self.op.node_name,
1333       "NODE_NAME": self.op.node_name,
1334       "NODE_PIP": self.op.primary_ip,
1335       "NODE_SIP": self.op.secondary_ip,
1336       }
1337     nodes_0 = self.cfg.GetNodeList()
1338     nodes_1 = nodes_0 + [self.op.node_name, ]
1339     return env, nodes_0, nodes_1
1340
1341   def CheckPrereq(self):
1342     """Check prerequisites.
1343
1344     This checks:
1345      - the new node is not already in the config
1346      - it is resolvable
1347      - its parameters (single/dual homed) matches the cluster
1348
1349     Any errors are signalled by raising errors.OpPrereqError.
1350
1351     """
1352     node_name = self.op.node_name
1353     cfg = self.cfg
1354
1355     dns_data = utils.HostInfo(node_name)
1356
1357     node = dns_data.name
1358     primary_ip = self.op.primary_ip = dns_data.ip
1359     secondary_ip = getattr(self.op, "secondary_ip", None)
1360     if secondary_ip is None:
1361       secondary_ip = primary_ip
1362     if not utils.IsValidIP(secondary_ip):
1363       raise errors.OpPrereqError("Invalid secondary IP given")
1364     self.op.secondary_ip = secondary_ip
1365     node_list = cfg.GetNodeList()
1366     if node in node_list:
1367       raise errors.OpPrereqError("Node %s is already in the configuration"
1368                                  % node)
1369
1370     for existing_node_name in node_list:
1371       existing_node = cfg.GetNodeInfo(existing_node_name)
1372       if (existing_node.primary_ip == primary_ip or
1373           existing_node.secondary_ip == primary_ip or
1374           existing_node.primary_ip == secondary_ip or
1375           existing_node.secondary_ip == secondary_ip):
1376         raise errors.OpPrereqError("New node ip address(es) conflict with"
1377                                    " existing node %s" % existing_node.name)
1378
1379     # check that the type of the node (single versus dual homed) is the
1380     # same as for the master
1381     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1382     master_singlehomed = myself.secondary_ip == myself.primary_ip
1383     newbie_singlehomed = secondary_ip == primary_ip
1384     if master_singlehomed != newbie_singlehomed:
1385       if master_singlehomed:
1386         raise errors.OpPrereqError("The master has no private ip but the"
1387                                    " new node has one")
1388       else:
1389         raise errors.OpPrereqError("The master has a private ip but the"
1390                                    " new node doesn't have one")
1391
1392     # checks reachablity
1393     if not utils.TcpPing(utils.HostInfo().name,
1394                          primary_ip,
1395                          constants.DEFAULT_NODED_PORT):
1396       raise errors.OpPrereqError("Node not reachable by ping")
1397
1398     if not newbie_singlehomed:
1399       # check reachability from my secondary ip to newbie's secondary ip
1400       if not utils.TcpPing(myself.secondary_ip,
1401                            secondary_ip,
1402                            constants.DEFAULT_NODED_PORT):
1403         raise errors.OpPrereqError(
1404           "Node secondary ip not reachable by TCP based ping to noded port")
1405
1406     self.new_node = objects.Node(name=node,
1407                                  primary_ip=primary_ip,
1408                                  secondary_ip=secondary_ip)
1409
1410   def Exec(self, feedback_fn):
1411     """Adds the new node to the cluster.
1412
1413     """
1414     new_node = self.new_node
1415     node = new_node.name
1416
1417     # set up inter-node password and certificate and restarts the node daemon
1418     gntpass = self.sstore.GetNodeDaemonPassword()
1419     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1420       raise errors.OpExecError("ganeti password corruption detected")
1421     f = open(constants.SSL_CERT_FILE)
1422     try:
1423       gntpem = f.read(8192)
1424     finally:
1425       f.close()
1426     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1427     # so we use this to detect an invalid certificate; as long as the
1428     # cert doesn't contain this, the here-document will be correctly
1429     # parsed by the shell sequence below
1430     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1431       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1432     if not gntpem.endswith("\n"):
1433       raise errors.OpExecError("PEM must end with newline")
1434     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1435
1436     # and then connect with ssh to set password and start ganeti-noded
1437     # note that all the below variables are sanitized at this point,
1438     # either by being constants or by the checks above
1439     ss = self.sstore
1440     mycommand = ("umask 077 && "
1441                  "echo '%s' > '%s' && "
1442                  "cat > '%s' << '!EOF.' && \n"
1443                  "%s!EOF.\n%s restart" %
1444                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1445                   constants.SSL_CERT_FILE, gntpem,
1446                   constants.NODE_INITD_SCRIPT))
1447
1448     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1449     if result.failed:
1450       raise errors.OpExecError("Remote command on node %s, error: %s,"
1451                                " output: %s" %
1452                                (node, result.fail_reason, result.output))
1453
1454     # check connectivity
1455     time.sleep(4)
1456
1457     result = rpc.call_version([node])[node]
1458     if result:
1459       if constants.PROTOCOL_VERSION == result:
1460         logger.Info("communication to node %s fine, sw version %s match" %
1461                     (node, result))
1462       else:
1463         raise errors.OpExecError("Version mismatch master version %s,"
1464                                  " node version %s" %
1465                                  (constants.PROTOCOL_VERSION, result))
1466     else:
1467       raise errors.OpExecError("Cannot get version from the new node")
1468
1469     # setup ssh on node
1470     logger.Info("copy ssh key to node %s" % node)
1471     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1472     keyarray = []
1473     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1474                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1475                 priv_key, pub_key]
1476
1477     for i in keyfiles:
1478       f = open(i, 'r')
1479       try:
1480         keyarray.append(f.read())
1481       finally:
1482         f.close()
1483
1484     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1485                                keyarray[3], keyarray[4], keyarray[5])
1486
1487     if not result:
1488       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1489
1490     # Add node to our /etc/hosts, and add key to known_hosts
1491     _AddHostToEtcHosts(new_node.name)
1492
1493     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1494                       self.cfg.GetHostKey())
1495
1496     if new_node.secondary_ip != new_node.primary_ip:
1497       if not rpc.call_node_tcp_ping(new_node.name,
1498                                     constants.LOCALHOST_IP_ADDRESS,
1499                                     new_node.secondary_ip,
1500                                     constants.DEFAULT_NODED_PORT,
1501                                     10, False):
1502         raise errors.OpExecError("Node claims it doesn't have the"
1503                                  " secondary ip you gave (%s).\n"
1504                                  "Please fix and re-run this command." %
1505                                  new_node.secondary_ip)
1506
1507     success, msg = ssh.VerifyNodeHostname(node)
1508     if not success:
1509       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1510                                " than the one the resolver gives: %s.\n"
1511                                "Please fix and re-run this command." %
1512                                (node, msg))
1513
1514     # Distribute updated /etc/hosts and known_hosts to all nodes,
1515     # including the node just added
1516     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1517     dist_nodes = self.cfg.GetNodeList() + [node]
1518     if myself.name in dist_nodes:
1519       dist_nodes.remove(myself.name)
1520
1521     logger.Debug("Copying hosts and known_hosts to all nodes")
1522     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1523       result = rpc.call_upload_file(dist_nodes, fname)
1524       for to_node in dist_nodes:
1525         if not result[to_node]:
1526           logger.Error("copy of file %s to node %s failed" %
1527                        (fname, to_node))
1528
1529     to_copy = ss.GetFileList()
1530     for fname in to_copy:
1531       if not ssh.CopyFileToNode(node, fname):
1532         logger.Error("could not copy file %s to node %s" % (fname, node))
1533
1534     logger.Info("adding node %s to cluster.conf" % node)
1535     self.cfg.AddNode(new_node)
1536
1537
1538 class LUMasterFailover(LogicalUnit):
1539   """Failover the master node to the current node.
1540
1541   This is a special LU in that it must run on a non-master node.
1542
1543   """
1544   HPATH = "master-failover"
1545   HTYPE = constants.HTYPE_CLUSTER
1546   REQ_MASTER = False
1547   _OP_REQP = []
1548
1549   def BuildHooksEnv(self):
1550     """Build hooks env.
1551
1552     This will run on the new master only in the pre phase, and on all
1553     the nodes in the post phase.
1554
1555     """
1556     env = {
1557       "OP_TARGET": self.new_master,
1558       "NEW_MASTER": self.new_master,
1559       "OLD_MASTER": self.old_master,
1560       }
1561     return env, [self.new_master], self.cfg.GetNodeList()
1562
1563   def CheckPrereq(self):
1564     """Check prerequisites.
1565
1566     This checks that we are not already the master.
1567
1568     """
1569     self.new_master = utils.HostInfo().name
1570     self.old_master = self.sstore.GetMasterNode()
1571
1572     if self.old_master == self.new_master:
1573       raise errors.OpPrereqError("This commands must be run on the node"
1574                                  " where you want the new master to be.\n"
1575                                  "%s is already the master" %
1576                                  self.old_master)
1577
1578   def Exec(self, feedback_fn):
1579     """Failover the master node.
1580
1581     This command, when run on a non-master node, will cause the current
1582     master to cease being master, and the non-master to become new
1583     master.
1584
1585     """
1586     #TODO: do not rely on gethostname returning the FQDN
1587     logger.Info("setting master to %s, old master: %s" %
1588                 (self.new_master, self.old_master))
1589
1590     if not rpc.call_node_stop_master(self.old_master):
1591       logger.Error("could disable the master role on the old master"
1592                    " %s, please disable manually" % self.old_master)
1593
1594     ss = self.sstore
1595     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1596     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1597                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1598       logger.Error("could not distribute the new simple store master file"
1599                    " to the other nodes, please check.")
1600
1601     if not rpc.call_node_start_master(self.new_master):
1602       logger.Error("could not start the master role on the new master"
1603                    " %s, please check" % self.new_master)
1604       feedback_fn("Error in activating the master IP on the new master,\n"
1605                   "please fix manually.")
1606
1607
1608
1609 class LUQueryClusterInfo(NoHooksLU):
1610   """Query cluster configuration.
1611
1612   """
1613   _OP_REQP = []
1614   REQ_MASTER = False
1615
1616   def CheckPrereq(self):
1617     """No prerequsites needed for this LU.
1618
1619     """
1620     pass
1621
1622   def Exec(self, feedback_fn):
1623     """Return cluster config.
1624
1625     """
1626     result = {
1627       "name": self.sstore.GetClusterName(),
1628       "software_version": constants.RELEASE_VERSION,
1629       "protocol_version": constants.PROTOCOL_VERSION,
1630       "config_version": constants.CONFIG_VERSION,
1631       "os_api_version": constants.OS_API_VERSION,
1632       "export_version": constants.EXPORT_VERSION,
1633       "master": self.sstore.GetMasterNode(),
1634       "architecture": (platform.architecture()[0], platform.machine()),
1635       }
1636
1637     return result
1638
1639
1640 class LUClusterCopyFile(NoHooksLU):
1641   """Copy file to cluster.
1642
1643   """
1644   _OP_REQP = ["nodes", "filename"]
1645
1646   def CheckPrereq(self):
1647     """Check prerequisites.
1648
1649     It should check that the named file exists and that the given list
1650     of nodes is valid.
1651
1652     """
1653     if not os.path.exists(self.op.filename):
1654       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1655
1656     self.nodes = _GetWantedNodes(self, self.op.nodes)
1657
1658   def Exec(self, feedback_fn):
1659     """Copy a file from master to some nodes.
1660
1661     Args:
1662       opts - class with options as members
1663       args - list containing a single element, the file name
1664     Opts used:
1665       nodes - list containing the name of target nodes; if empty, all nodes
1666
1667     """
1668     filename = self.op.filename
1669
1670     myname = utils.HostInfo().name
1671
1672     for node in self.nodes:
1673       if node == myname:
1674         continue
1675       if not ssh.CopyFileToNode(node, filename):
1676         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1677
1678
1679 class LUDumpClusterConfig(NoHooksLU):
1680   """Return a text-representation of the cluster-config.
1681
1682   """
1683   _OP_REQP = []
1684
1685   def CheckPrereq(self):
1686     """No prerequisites.
1687
1688     """
1689     pass
1690
1691   def Exec(self, feedback_fn):
1692     """Dump a representation of the cluster config to the standard output.
1693
1694     """
1695     return self.cfg.DumpConfig()
1696
1697
1698 class LURunClusterCommand(NoHooksLU):
1699   """Run a command on some nodes.
1700
1701   """
1702   _OP_REQP = ["command", "nodes"]
1703
1704   def CheckPrereq(self):
1705     """Check prerequisites.
1706
1707     It checks that the given list of nodes is valid.
1708
1709     """
1710     self.nodes = _GetWantedNodes(self, self.op.nodes)
1711
1712   def Exec(self, feedback_fn):
1713     """Run a command on some nodes.
1714
1715     """
1716     data = []
1717     for node in self.nodes:
1718       result = ssh.SSHCall(node, "root", self.op.command)
1719       data.append((node, result.output, result.exit_code))
1720
1721     return data
1722
1723
1724 class LUActivateInstanceDisks(NoHooksLU):
1725   """Bring up an instance's disks.
1726
1727   """
1728   _OP_REQP = ["instance_name"]
1729
1730   def CheckPrereq(self):
1731     """Check prerequisites.
1732
1733     This checks that the instance is in the cluster.
1734
1735     """
1736     instance = self.cfg.GetInstanceInfo(
1737       self.cfg.ExpandInstanceName(self.op.instance_name))
1738     if instance is None:
1739       raise errors.OpPrereqError("Instance '%s' not known" %
1740                                  self.op.instance_name)
1741     self.instance = instance
1742
1743
1744   def Exec(self, feedback_fn):
1745     """Activate the disks.
1746
1747     """
1748     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1749     if not disks_ok:
1750       raise errors.OpExecError("Cannot activate block devices")
1751
1752     return disks_info
1753
1754
1755 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1756   """Prepare the block devices for an instance.
1757
1758   This sets up the block devices on all nodes.
1759
1760   Args:
1761     instance: a ganeti.objects.Instance object
1762     ignore_secondaries: if true, errors on secondary nodes won't result
1763                         in an error return from the function
1764
1765   Returns:
1766     false if the operation failed
1767     list of (host, instance_visible_name, node_visible_name) if the operation
1768          suceeded with the mapping from node devices to instance devices
1769   """
1770   device_info = []
1771   disks_ok = True
1772   for inst_disk in instance.disks:
1773     master_result = None
1774     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1775       cfg.SetDiskID(node_disk, node)
1776       is_primary = node == instance.primary_node
1777       result = rpc.call_blockdev_assemble(node, node_disk,
1778                                           instance.name, is_primary)
1779       if not result:
1780         logger.Error("could not prepare block device %s on node %s (is_pri"
1781                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1782         if is_primary or not ignore_secondaries:
1783           disks_ok = False
1784       if is_primary:
1785         master_result = result
1786     device_info.append((instance.primary_node, inst_disk.iv_name,
1787                         master_result))
1788
1789   # leave the disks configured for the primary node
1790   # this is a workaround that would be fixed better by
1791   # improving the logical/physical id handling
1792   for disk in instance.disks:
1793     cfg.SetDiskID(disk, instance.primary_node)
1794
1795   return disks_ok, device_info
1796
1797
1798 def _StartInstanceDisks(cfg, instance, force):
1799   """Start the disks of an instance.
1800
1801   """
1802   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1803                                            ignore_secondaries=force)
1804   if not disks_ok:
1805     _ShutdownInstanceDisks(instance, cfg)
1806     if force is not None and not force:
1807       logger.Error("If the message above refers to a secondary node,"
1808                    " you can retry the operation using '--force'.")
1809     raise errors.OpExecError("Disk consistency error")
1810
1811
1812 class LUDeactivateInstanceDisks(NoHooksLU):
1813   """Shutdown an instance's disks.
1814
1815   """
1816   _OP_REQP = ["instance_name"]
1817
1818   def CheckPrereq(self):
1819     """Check prerequisites.
1820
1821     This checks that the instance is in the cluster.
1822
1823     """
1824     instance = self.cfg.GetInstanceInfo(
1825       self.cfg.ExpandInstanceName(self.op.instance_name))
1826     if instance is None:
1827       raise errors.OpPrereqError("Instance '%s' not known" %
1828                                  self.op.instance_name)
1829     self.instance = instance
1830
1831   def Exec(self, feedback_fn):
1832     """Deactivate the disks
1833
1834     """
1835     instance = self.instance
1836     ins_l = rpc.call_instance_list([instance.primary_node])
1837     ins_l = ins_l[instance.primary_node]
1838     if not type(ins_l) is list:
1839       raise errors.OpExecError("Can't contact node '%s'" %
1840                                instance.primary_node)
1841
1842     if self.instance.name in ins_l:
1843       raise errors.OpExecError("Instance is running, can't shutdown"
1844                                " block devices.")
1845
1846     _ShutdownInstanceDisks(instance, self.cfg)
1847
1848
1849 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1850   """Shutdown block devices of an instance.
1851
1852   This does the shutdown on all nodes of the instance.
1853
1854   If the ignore_primary is false, errors on the primary node are
1855   ignored.
1856
1857   """
1858   result = True
1859   for disk in instance.disks:
1860     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1861       cfg.SetDiskID(top_disk, node)
1862       if not rpc.call_blockdev_shutdown(node, top_disk):
1863         logger.Error("could not shutdown block device %s on node %s" %
1864                      (disk.iv_name, node))
1865         if not ignore_primary or node != instance.primary_node:
1866           result = False
1867   return result
1868
1869
1870 class LUStartupInstance(LogicalUnit):
1871   """Starts an instance.
1872
1873   """
1874   HPATH = "instance-start"
1875   HTYPE = constants.HTYPE_INSTANCE
1876   _OP_REQP = ["instance_name", "force"]
1877
1878   def BuildHooksEnv(self):
1879     """Build hooks env.
1880
1881     This runs on master, primary and secondary nodes of the instance.
1882
1883     """
1884     env = {
1885       "FORCE": self.op.force,
1886       }
1887     env.update(_BuildInstanceHookEnvByObject(self.instance))
1888     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1889           list(self.instance.secondary_nodes))
1890     return env, nl, nl
1891
1892   def CheckPrereq(self):
1893     """Check prerequisites.
1894
1895     This checks that the instance is in the cluster.
1896
1897     """
1898     instance = self.cfg.GetInstanceInfo(
1899       self.cfg.ExpandInstanceName(self.op.instance_name))
1900     if instance is None:
1901       raise errors.OpPrereqError("Instance '%s' not known" %
1902                                  self.op.instance_name)
1903
1904     # check bridges existance
1905     _CheckInstanceBridgesExist(instance)
1906
1907     self.instance = instance
1908     self.op.instance_name = instance.name
1909
1910   def Exec(self, feedback_fn):
1911     """Start the instance.
1912
1913     """
1914     instance = self.instance
1915     force = self.op.force
1916     extra_args = getattr(self.op, "extra_args", "")
1917
1918     node_current = instance.primary_node
1919
1920     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1921     if not nodeinfo:
1922       raise errors.OpExecError("Could not contact node %s for infos" %
1923                                (node_current))
1924
1925     freememory = nodeinfo[node_current]['memory_free']
1926     memory = instance.memory
1927     if memory > freememory:
1928       raise errors.OpExecError("Not enough memory to start instance"
1929                                " %s on node %s"
1930                                " needed %s MiB, available %s MiB" %
1931                                (instance.name, node_current, memory,
1932                                 freememory))
1933
1934     _StartInstanceDisks(self.cfg, instance, force)
1935
1936     if not rpc.call_instance_start(node_current, instance, extra_args):
1937       _ShutdownInstanceDisks(instance, self.cfg)
1938       raise errors.OpExecError("Could not start instance")
1939
1940     self.cfg.MarkInstanceUp(instance.name)
1941
1942
1943 class LURebootInstance(LogicalUnit):
1944   """Reboot an instance.
1945
1946   """
1947   HPATH = "instance-reboot"
1948   HTYPE = constants.HTYPE_INSTANCE
1949   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1950
1951   def BuildHooksEnv(self):
1952     """Build hooks env.
1953
1954     This runs on master, primary and secondary nodes of the instance.
1955
1956     """
1957     env = {
1958       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1959       }
1960     env.update(_BuildInstanceHookEnvByObject(self.instance))
1961     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1962           list(self.instance.secondary_nodes))
1963     return env, nl, nl
1964
1965   def CheckPrereq(self):
1966     """Check prerequisites.
1967
1968     This checks that the instance is in the cluster.
1969
1970     """
1971     instance = self.cfg.GetInstanceInfo(
1972       self.cfg.ExpandInstanceName(self.op.instance_name))
1973     if instance is None:
1974       raise errors.OpPrereqError("Instance '%s' not known" %
1975                                  self.op.instance_name)
1976
1977     # check bridges existance
1978     _CheckInstanceBridgesExist(instance)
1979
1980     self.instance = instance
1981     self.op.instance_name = instance.name
1982
1983   def Exec(self, feedback_fn):
1984     """Reboot the instance.
1985
1986     """
1987     instance = self.instance
1988     ignore_secondaries = self.op.ignore_secondaries
1989     reboot_type = self.op.reboot_type
1990     extra_args = getattr(self.op, "extra_args", "")
1991
1992     node_current = instance.primary_node
1993
1994     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
1995                            constants.INSTANCE_REBOOT_HARD,
1996                            constants.INSTANCE_REBOOT_FULL]:
1997       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
1998                                   (constants.INSTANCE_REBOOT_SOFT,
1999                                    constants.INSTANCE_REBOOT_HARD,
2000                                    constants.INSTANCE_REBOOT_FULL))
2001
2002     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2003                        constants.INSTANCE_REBOOT_HARD]:
2004       if not rpc.call_instance_reboot(node_current, instance,
2005                                       reboot_type, extra_args):
2006         raise errors.OpExecError("Could not reboot instance")
2007     else:
2008       if not rpc.call_instance_shutdown(node_current, instance):
2009         raise errors.OpExecError("could not shutdown instance for full reboot")
2010       _ShutdownInstanceDisks(instance, self.cfg)
2011       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2012       if not rpc.call_instance_start(node_current, instance, extra_args):
2013         _ShutdownInstanceDisks(instance, self.cfg)
2014         raise errors.OpExecError("Could not start instance for full reboot")
2015
2016     self.cfg.MarkInstanceUp(instance.name)
2017
2018
2019 class LUShutdownInstance(LogicalUnit):
2020   """Shutdown an instance.
2021
2022   """
2023   HPATH = "instance-stop"
2024   HTYPE = constants.HTYPE_INSTANCE
2025   _OP_REQP = ["instance_name"]
2026
2027   def BuildHooksEnv(self):
2028     """Build hooks env.
2029
2030     This runs on master, primary and secondary nodes of the instance.
2031
2032     """
2033     env = _BuildInstanceHookEnvByObject(self.instance)
2034     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2035           list(self.instance.secondary_nodes))
2036     return env, nl, nl
2037
2038   def CheckPrereq(self):
2039     """Check prerequisites.
2040
2041     This checks that the instance is in the cluster.
2042
2043     """
2044     instance = self.cfg.GetInstanceInfo(
2045       self.cfg.ExpandInstanceName(self.op.instance_name))
2046     if instance is None:
2047       raise errors.OpPrereqError("Instance '%s' not known" %
2048                                  self.op.instance_name)
2049     self.instance = instance
2050
2051   def Exec(self, feedback_fn):
2052     """Shutdown the instance.
2053
2054     """
2055     instance = self.instance
2056     node_current = instance.primary_node
2057     if not rpc.call_instance_shutdown(node_current, instance):
2058       logger.Error("could not shutdown instance")
2059
2060     self.cfg.MarkInstanceDown(instance.name)
2061     _ShutdownInstanceDisks(instance, self.cfg)
2062
2063
2064 class LUReinstallInstance(LogicalUnit):
2065   """Reinstall an instance.
2066
2067   """
2068   HPATH = "instance-reinstall"
2069   HTYPE = constants.HTYPE_INSTANCE
2070   _OP_REQP = ["instance_name"]
2071
2072   def BuildHooksEnv(self):
2073     """Build hooks env.
2074
2075     This runs on master, primary and secondary nodes of the instance.
2076
2077     """
2078     env = _BuildInstanceHookEnvByObject(self.instance)
2079     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2080           list(self.instance.secondary_nodes))
2081     return env, nl, nl
2082
2083   def CheckPrereq(self):
2084     """Check prerequisites.
2085
2086     This checks that the instance is in the cluster and is not running.
2087
2088     """
2089     instance = self.cfg.GetInstanceInfo(
2090       self.cfg.ExpandInstanceName(self.op.instance_name))
2091     if instance is None:
2092       raise errors.OpPrereqError("Instance '%s' not known" %
2093                                  self.op.instance_name)
2094     if instance.disk_template == constants.DT_DISKLESS:
2095       raise errors.OpPrereqError("Instance '%s' has no disks" %
2096                                  self.op.instance_name)
2097     if instance.status != "down":
2098       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2099                                  self.op.instance_name)
2100     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2101     if remote_info:
2102       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2103                                  (self.op.instance_name,
2104                                   instance.primary_node))
2105
2106     self.op.os_type = getattr(self.op, "os_type", None)
2107     if self.op.os_type is not None:
2108       # OS verification
2109       pnode = self.cfg.GetNodeInfo(
2110         self.cfg.ExpandNodeName(instance.primary_node))
2111       if pnode is None:
2112         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2113                                    self.op.pnode)
2114       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2115       if not os_obj:
2116         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2117                                    " primary node"  % self.op.os_type)
2118
2119     self.instance = instance
2120
2121   def Exec(self, feedback_fn):
2122     """Reinstall the instance.
2123
2124     """
2125     inst = self.instance
2126
2127     if self.op.os_type is not None:
2128       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2129       inst.os = self.op.os_type
2130       self.cfg.AddInstance(inst)
2131
2132     _StartInstanceDisks(self.cfg, inst, None)
2133     try:
2134       feedback_fn("Running the instance OS create scripts...")
2135       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2136         raise errors.OpExecError("Could not install OS for instance %s "
2137                                  "on node %s" %
2138                                  (inst.name, inst.primary_node))
2139     finally:
2140       _ShutdownInstanceDisks(inst, self.cfg)
2141
2142
2143 class LURenameInstance(LogicalUnit):
2144   """Rename an instance.
2145
2146   """
2147   HPATH = "instance-rename"
2148   HTYPE = constants.HTYPE_INSTANCE
2149   _OP_REQP = ["instance_name", "new_name"]
2150
2151   def BuildHooksEnv(self):
2152     """Build hooks env.
2153
2154     This runs on master, primary and secondary nodes of the instance.
2155
2156     """
2157     env = _BuildInstanceHookEnvByObject(self.instance)
2158     env["INSTANCE_NEW_NAME"] = self.op.new_name
2159     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2160           list(self.instance.secondary_nodes))
2161     return env, nl, nl
2162
2163   def CheckPrereq(self):
2164     """Check prerequisites.
2165
2166     This checks that the instance is in the cluster and is not running.
2167
2168     """
2169     instance = self.cfg.GetInstanceInfo(
2170       self.cfg.ExpandInstanceName(self.op.instance_name))
2171     if instance is None:
2172       raise errors.OpPrereqError("Instance '%s' not known" %
2173                                  self.op.instance_name)
2174     if instance.status != "down":
2175       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2176                                  self.op.instance_name)
2177     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2178     if remote_info:
2179       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2180                                  (self.op.instance_name,
2181                                   instance.primary_node))
2182     self.instance = instance
2183
2184     # new name verification
2185     name_info = utils.HostInfo(self.op.new_name)
2186
2187     self.op.new_name = new_name = name_info.name
2188     if not getattr(self.op, "ignore_ip", False):
2189       command = ["fping", "-q", name_info.ip]
2190       result = utils.RunCmd(command)
2191       if not result.failed:
2192         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2193                                    (name_info.ip, new_name))
2194
2195
2196   def Exec(self, feedback_fn):
2197     """Reinstall the instance.
2198
2199     """
2200     inst = self.instance
2201     old_name = inst.name
2202
2203     self.cfg.RenameInstance(inst.name, self.op.new_name)
2204
2205     # re-read the instance from the configuration after rename
2206     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2207
2208     _StartInstanceDisks(self.cfg, inst, None)
2209     try:
2210       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2211                                           "sda", "sdb"):
2212         msg = ("Could run OS rename script for instance %s\n"
2213                "on node %s\n"
2214                "(but the instance has been renamed in Ganeti)" %
2215                (inst.name, inst.primary_node))
2216         logger.Error(msg)
2217     finally:
2218       _ShutdownInstanceDisks(inst, self.cfg)
2219
2220
2221 class LURemoveInstance(LogicalUnit):
2222   """Remove an instance.
2223
2224   """
2225   HPATH = "instance-remove"
2226   HTYPE = constants.HTYPE_INSTANCE
2227   _OP_REQP = ["instance_name"]
2228
2229   def BuildHooksEnv(self):
2230     """Build hooks env.
2231
2232     This runs on master, primary and secondary nodes of the instance.
2233
2234     """
2235     env = _BuildInstanceHookEnvByObject(self.instance)
2236     nl = [self.sstore.GetMasterNode()]
2237     return env, nl, nl
2238
2239   def CheckPrereq(self):
2240     """Check prerequisites.
2241
2242     This checks that the instance is in the cluster.
2243
2244     """
2245     instance = self.cfg.GetInstanceInfo(
2246       self.cfg.ExpandInstanceName(self.op.instance_name))
2247     if instance is None:
2248       raise errors.OpPrereqError("Instance '%s' not known" %
2249                                  self.op.instance_name)
2250     self.instance = instance
2251
2252   def Exec(self, feedback_fn):
2253     """Remove the instance.
2254
2255     """
2256     instance = self.instance
2257     logger.Info("shutting down instance %s on node %s" %
2258                 (instance.name, instance.primary_node))
2259
2260     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2261       if self.op.ignore_failures:
2262         feedback_fn("Warning: can't shutdown instance")
2263       else:
2264         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2265                                  (instance.name, instance.primary_node))
2266
2267     logger.Info("removing block devices for instance %s" % instance.name)
2268
2269     if not _RemoveDisks(instance, self.cfg):
2270       if self.op.ignore_failures:
2271         feedback_fn("Warning: can't remove instance's disks")
2272       else:
2273         raise errors.OpExecError("Can't remove instance's disks")
2274
2275     logger.Info("removing instance %s out of cluster config" % instance.name)
2276
2277     self.cfg.RemoveInstance(instance.name)
2278
2279
2280 class LUQueryInstances(NoHooksLU):
2281   """Logical unit for querying instances.
2282
2283   """
2284   _OP_REQP = ["output_fields", "names"]
2285
2286   def CheckPrereq(self):
2287     """Check prerequisites.
2288
2289     This checks that the fields required are valid output fields.
2290
2291     """
2292     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2293     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2294                                "admin_state", "admin_ram",
2295                                "disk_template", "ip", "mac", "bridge",
2296                                "sda_size", "sdb_size"],
2297                        dynamic=self.dynamic_fields,
2298                        selected=self.op.output_fields)
2299
2300     self.wanted = _GetWantedInstances(self, self.op.names)
2301
2302   def Exec(self, feedback_fn):
2303     """Computes the list of nodes and their attributes.
2304
2305     """
2306     instance_names = self.wanted
2307     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2308                      in instance_names]
2309
2310     # begin data gathering
2311
2312     nodes = frozenset([inst.primary_node for inst in instance_list])
2313
2314     bad_nodes = []
2315     if self.dynamic_fields.intersection(self.op.output_fields):
2316       live_data = {}
2317       node_data = rpc.call_all_instances_info(nodes)
2318       for name in nodes:
2319         result = node_data[name]
2320         if result:
2321           live_data.update(result)
2322         elif result == False:
2323           bad_nodes.append(name)
2324         # else no instance is alive
2325     else:
2326       live_data = dict([(name, {}) for name in instance_names])
2327
2328     # end data gathering
2329
2330     output = []
2331     for instance in instance_list:
2332       iout = []
2333       for field in self.op.output_fields:
2334         if field == "name":
2335           val = instance.name
2336         elif field == "os":
2337           val = instance.os
2338         elif field == "pnode":
2339           val = instance.primary_node
2340         elif field == "snodes":
2341           val = list(instance.secondary_nodes)
2342         elif field == "admin_state":
2343           val = (instance.status != "down")
2344         elif field == "oper_state":
2345           if instance.primary_node in bad_nodes:
2346             val = None
2347           else:
2348             val = bool(live_data.get(instance.name))
2349         elif field == "admin_ram":
2350           val = instance.memory
2351         elif field == "oper_ram":
2352           if instance.primary_node in bad_nodes:
2353             val = None
2354           elif instance.name in live_data:
2355             val = live_data[instance.name].get("memory", "?")
2356           else:
2357             val = "-"
2358         elif field == "disk_template":
2359           val = instance.disk_template
2360         elif field == "ip":
2361           val = instance.nics[0].ip
2362         elif field == "bridge":
2363           val = instance.nics[0].bridge
2364         elif field == "mac":
2365           val = instance.nics[0].mac
2366         elif field == "sda_size" or field == "sdb_size":
2367           disk = instance.FindDisk(field[:3])
2368           if disk is None:
2369             val = None
2370           else:
2371             val = disk.size
2372         else:
2373           raise errors.ParameterError(field)
2374         iout.append(val)
2375       output.append(iout)
2376
2377     return output
2378
2379
2380 class LUFailoverInstance(LogicalUnit):
2381   """Failover an instance.
2382
2383   """
2384   HPATH = "instance-failover"
2385   HTYPE = constants.HTYPE_INSTANCE
2386   _OP_REQP = ["instance_name", "ignore_consistency"]
2387
2388   def BuildHooksEnv(self):
2389     """Build hooks env.
2390
2391     This runs on master, primary and secondary nodes of the instance.
2392
2393     """
2394     env = {
2395       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2396       }
2397     env.update(_BuildInstanceHookEnvByObject(self.instance))
2398     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2399     return env, nl, nl
2400
2401   def CheckPrereq(self):
2402     """Check prerequisites.
2403
2404     This checks that the instance is in the cluster.
2405
2406     """
2407     instance = self.cfg.GetInstanceInfo(
2408       self.cfg.ExpandInstanceName(self.op.instance_name))
2409     if instance is None:
2410       raise errors.OpPrereqError("Instance '%s' not known" %
2411                                  self.op.instance_name)
2412
2413     if instance.disk_template not in constants.DTS_NET_MIRROR:
2414       raise errors.OpPrereqError("Instance's disk layout is not"
2415                                  " network mirrored, cannot failover.")
2416
2417     secondary_nodes = instance.secondary_nodes
2418     if not secondary_nodes:
2419       raise errors.ProgrammerError("no secondary node but using "
2420                                    "DT_REMOTE_RAID1 template")
2421
2422     # check memory requirements on the secondary node
2423     target_node = secondary_nodes[0]
2424     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2425     info = nodeinfo.get(target_node, None)
2426     if not info:
2427       raise errors.OpPrereqError("Cannot get current information"
2428                                  " from node '%s'" % nodeinfo)
2429     if instance.memory > info['memory_free']:
2430       raise errors.OpPrereqError("Not enough memory on target node %s."
2431                                  " %d MB available, %d MB required" %
2432                                  (target_node, info['memory_free'],
2433                                   instance.memory))
2434
2435     # check bridge existance
2436     brlist = [nic.bridge for nic in instance.nics]
2437     if not rpc.call_bridges_exist(target_node, brlist):
2438       raise errors.OpPrereqError("One or more target bridges %s does not"
2439                                  " exist on destination node '%s'" %
2440                                  (brlist, target_node))
2441
2442     self.instance = instance
2443
2444   def Exec(self, feedback_fn):
2445     """Failover an instance.
2446
2447     The failover is done by shutting it down on its present node and
2448     starting it on the secondary.
2449
2450     """
2451     instance = self.instance
2452
2453     source_node = instance.primary_node
2454     target_node = instance.secondary_nodes[0]
2455
2456     feedback_fn("* checking disk consistency between source and target")
2457     for dev in instance.disks:
2458       # for remote_raid1, these are md over drbd
2459       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2460         if not self.op.ignore_consistency:
2461           raise errors.OpExecError("Disk %s is degraded on target node,"
2462                                    " aborting failover." % dev.iv_name)
2463
2464     feedback_fn("* checking target node resource availability")
2465     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2466
2467     if not nodeinfo:
2468       raise errors.OpExecError("Could not contact target node %s." %
2469                                target_node)
2470
2471     free_memory = int(nodeinfo[target_node]['memory_free'])
2472     memory = instance.memory
2473     if memory > free_memory:
2474       raise errors.OpExecError("Not enough memory to create instance %s on"
2475                                " node %s. needed %s MiB, available %s MiB" %
2476                                (instance.name, target_node, memory,
2477                                 free_memory))
2478
2479     feedback_fn("* shutting down instance on source node")
2480     logger.Info("Shutting down instance %s on node %s" %
2481                 (instance.name, source_node))
2482
2483     if not rpc.call_instance_shutdown(source_node, instance):
2484       if self.op.ignore_consistency:
2485         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2486                      " anyway. Please make sure node %s is down"  %
2487                      (instance.name, source_node, source_node))
2488       else:
2489         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2490                                  (instance.name, source_node))
2491
2492     feedback_fn("* deactivating the instance's disks on source node")
2493     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2494       raise errors.OpExecError("Can't shut down the instance's disks.")
2495
2496     instance.primary_node = target_node
2497     # distribute new instance config to the other nodes
2498     self.cfg.AddInstance(instance)
2499
2500     feedback_fn("* activating the instance's disks on target node")
2501     logger.Info("Starting instance %s on node %s" %
2502                 (instance.name, target_node))
2503
2504     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2505                                              ignore_secondaries=True)
2506     if not disks_ok:
2507       _ShutdownInstanceDisks(instance, self.cfg)
2508       raise errors.OpExecError("Can't activate the instance's disks")
2509
2510     feedback_fn("* starting the instance on the target node")
2511     if not rpc.call_instance_start(target_node, instance, None):
2512       _ShutdownInstanceDisks(instance, self.cfg)
2513       raise errors.OpExecError("Could not start instance %s on node %s." %
2514                                (instance.name, target_node))
2515
2516
2517 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2518   """Create a tree of block devices on the primary node.
2519
2520   This always creates all devices.
2521
2522   """
2523   if device.children:
2524     for child in device.children:
2525       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2526         return False
2527
2528   cfg.SetDiskID(device, node)
2529   new_id = rpc.call_blockdev_create(node, device, device.size,
2530                                     instance.name, True, info)
2531   if not new_id:
2532     return False
2533   if device.physical_id is None:
2534     device.physical_id = new_id
2535   return True
2536
2537
2538 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2539   """Create a tree of block devices on a secondary node.
2540
2541   If this device type has to be created on secondaries, create it and
2542   all its children.
2543
2544   If not, just recurse to children keeping the same 'force' value.
2545
2546   """
2547   if device.CreateOnSecondary():
2548     force = True
2549   if device.children:
2550     for child in device.children:
2551       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2552                                         child, force, info):
2553         return False
2554
2555   if not force:
2556     return True
2557   cfg.SetDiskID(device, node)
2558   new_id = rpc.call_blockdev_create(node, device, device.size,
2559                                     instance.name, False, info)
2560   if not new_id:
2561     return False
2562   if device.physical_id is None:
2563     device.physical_id = new_id
2564   return True
2565
2566
2567 def _GenerateUniqueNames(cfg, exts):
2568   """Generate a suitable LV name.
2569
2570   This will generate a logical volume name for the given instance.
2571
2572   """
2573   results = []
2574   for val in exts:
2575     new_id = cfg.GenerateUniqueID()
2576     results.append("%s%s" % (new_id, val))
2577   return results
2578
2579
2580 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2581   """Generate a drbd device complete with its children.
2582
2583   """
2584   port = cfg.AllocatePort()
2585   vgname = cfg.GetVGName()
2586   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2587                           logical_id=(vgname, names[0]))
2588   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2589                           logical_id=(vgname, names[1]))
2590   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2591                           logical_id = (primary, secondary, port),
2592                           children = [dev_data, dev_meta])
2593   return drbd_dev
2594
2595
2596 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2597   """Generate a drbd8 device complete with its children.
2598
2599   """
2600   port = cfg.AllocatePort()
2601   vgname = cfg.GetVGName()
2602   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2603                           logical_id=(vgname, names[0]))
2604   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2605                           logical_id=(vgname, names[1]))
2606   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2607                           logical_id = (primary, secondary, port),
2608                           children = [dev_data, dev_meta],
2609                           iv_name=iv_name)
2610   return drbd_dev
2611
2612 def _GenerateDiskTemplate(cfg, template_name,
2613                           instance_name, primary_node,
2614                           secondary_nodes, disk_sz, swap_sz):
2615   """Generate the entire disk layout for a given template type.
2616
2617   """
2618   #TODO: compute space requirements
2619
2620   vgname = cfg.GetVGName()
2621   if template_name == "diskless":
2622     disks = []
2623   elif template_name == "plain":
2624     if len(secondary_nodes) != 0:
2625       raise errors.ProgrammerError("Wrong template configuration")
2626
2627     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2628     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2629                            logical_id=(vgname, names[0]),
2630                            iv_name = "sda")
2631     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2632                            logical_id=(vgname, names[1]),
2633                            iv_name = "sdb")
2634     disks = [sda_dev, sdb_dev]
2635   elif template_name == "local_raid1":
2636     if len(secondary_nodes) != 0:
2637       raise errors.ProgrammerError("Wrong template configuration")
2638
2639
2640     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2641                                        ".sdb_m1", ".sdb_m2"])
2642     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2643                               logical_id=(vgname, names[0]))
2644     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2645                               logical_id=(vgname, names[1]))
2646     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2647                               size=disk_sz,
2648                               children = [sda_dev_m1, sda_dev_m2])
2649     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2650                               logical_id=(vgname, names[2]))
2651     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2652                               logical_id=(vgname, names[3]))
2653     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2654                               size=swap_sz,
2655                               children = [sdb_dev_m1, sdb_dev_m2])
2656     disks = [md_sda_dev, md_sdb_dev]
2657   elif template_name == constants.DT_REMOTE_RAID1:
2658     if len(secondary_nodes) != 1:
2659       raise errors.ProgrammerError("Wrong template configuration")
2660     remote_node = secondary_nodes[0]
2661     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2662                                        ".sdb_data", ".sdb_meta"])
2663     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2664                                          disk_sz, names[0:2])
2665     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2666                               children = [drbd_sda_dev], size=disk_sz)
2667     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2668                                          swap_sz, names[2:4])
2669     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2670                               children = [drbd_sdb_dev], size=swap_sz)
2671     disks = [md_sda_dev, md_sdb_dev]
2672   elif template_name == constants.DT_DRBD8:
2673     if len(secondary_nodes) != 1:
2674       raise errors.ProgrammerError("Wrong template configuration")
2675     remote_node = secondary_nodes[0]
2676     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2677                                        ".sdb_data", ".sdb_meta"])
2678     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2679                                          disk_sz, names[0:2], "sda")
2680     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2681                                          swap_sz, names[2:4], "sdb")
2682     disks = [drbd_sda_dev, drbd_sdb_dev]
2683   else:
2684     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2685   return disks
2686
2687
2688 def _GetInstanceInfoText(instance):
2689   """Compute that text that should be added to the disk's metadata.
2690
2691   """
2692   return "originstname+%s" % instance.name
2693
2694
2695 def _CreateDisks(cfg, instance):
2696   """Create all disks for an instance.
2697
2698   This abstracts away some work from AddInstance.
2699
2700   Args:
2701     instance: the instance object
2702
2703   Returns:
2704     True or False showing the success of the creation process
2705
2706   """
2707   info = _GetInstanceInfoText(instance)
2708
2709   for device in instance.disks:
2710     logger.Info("creating volume %s for instance %s" %
2711               (device.iv_name, instance.name))
2712     #HARDCODE
2713     for secondary_node in instance.secondary_nodes:
2714       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2715                                         device, False, info):
2716         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2717                      (device.iv_name, device, secondary_node))
2718         return False
2719     #HARDCODE
2720     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2721                                     instance, device, info):
2722       logger.Error("failed to create volume %s on primary!" %
2723                    device.iv_name)
2724       return False
2725   return True
2726
2727
2728 def _RemoveDisks(instance, cfg):
2729   """Remove all disks for an instance.
2730
2731   This abstracts away some work from `AddInstance()` and
2732   `RemoveInstance()`. Note that in case some of the devices couldn't
2733   be removed, the removal will continue with the other ones (compare
2734   with `_CreateDisks()`).
2735
2736   Args:
2737     instance: the instance object
2738
2739   Returns:
2740     True or False showing the success of the removal proces
2741
2742   """
2743   logger.Info("removing block devices for instance %s" % instance.name)
2744
2745   result = True
2746   for device in instance.disks:
2747     for node, disk in device.ComputeNodeTree(instance.primary_node):
2748       cfg.SetDiskID(disk, node)
2749       if not rpc.call_blockdev_remove(node, disk):
2750         logger.Error("could not remove block device %s on node %s,"
2751                      " continuing anyway" %
2752                      (device.iv_name, node))
2753         result = False
2754   return result
2755
2756
2757 class LUCreateInstance(LogicalUnit):
2758   """Create an instance.
2759
2760   """
2761   HPATH = "instance-add"
2762   HTYPE = constants.HTYPE_INSTANCE
2763   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2764               "disk_template", "swap_size", "mode", "start", "vcpus",
2765               "wait_for_sync", "ip_check"]
2766
2767   def BuildHooksEnv(self):
2768     """Build hooks env.
2769
2770     This runs on master, primary and secondary nodes of the instance.
2771
2772     """
2773     env = {
2774       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2775       "INSTANCE_DISK_SIZE": self.op.disk_size,
2776       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2777       "INSTANCE_ADD_MODE": self.op.mode,
2778       }
2779     if self.op.mode == constants.INSTANCE_IMPORT:
2780       env["INSTANCE_SRC_NODE"] = self.op.src_node
2781       env["INSTANCE_SRC_PATH"] = self.op.src_path
2782       env["INSTANCE_SRC_IMAGE"] = self.src_image
2783
2784     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2785       primary_node=self.op.pnode,
2786       secondary_nodes=self.secondaries,
2787       status=self.instance_status,
2788       os_type=self.op.os_type,
2789       memory=self.op.mem_size,
2790       vcpus=self.op.vcpus,
2791       nics=[(self.inst_ip, self.op.bridge)],
2792     ))
2793
2794     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2795           self.secondaries)
2796     return env, nl, nl
2797
2798
2799   def CheckPrereq(self):
2800     """Check prerequisites.
2801
2802     """
2803     if self.op.mode not in (constants.INSTANCE_CREATE,
2804                             constants.INSTANCE_IMPORT):
2805       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2806                                  self.op.mode)
2807
2808     if self.op.mode == constants.INSTANCE_IMPORT:
2809       src_node = getattr(self.op, "src_node", None)
2810       src_path = getattr(self.op, "src_path", None)
2811       if src_node is None or src_path is None:
2812         raise errors.OpPrereqError("Importing an instance requires source"
2813                                    " node and path options")
2814       src_node_full = self.cfg.ExpandNodeName(src_node)
2815       if src_node_full is None:
2816         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2817       self.op.src_node = src_node = src_node_full
2818
2819       if not os.path.isabs(src_path):
2820         raise errors.OpPrereqError("The source path must be absolute")
2821
2822       export_info = rpc.call_export_info(src_node, src_path)
2823
2824       if not export_info:
2825         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2826
2827       if not export_info.has_section(constants.INISECT_EXP):
2828         raise errors.ProgrammerError("Corrupted export config")
2829
2830       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2831       if (int(ei_version) != constants.EXPORT_VERSION):
2832         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2833                                    (ei_version, constants.EXPORT_VERSION))
2834
2835       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2836         raise errors.OpPrereqError("Can't import instance with more than"
2837                                    " one data disk")
2838
2839       # FIXME: are the old os-es, disk sizes, etc. useful?
2840       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2841       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2842                                                          'disk0_dump'))
2843       self.src_image = diskimage
2844     else: # INSTANCE_CREATE
2845       if getattr(self.op, "os_type", None) is None:
2846         raise errors.OpPrereqError("No guest OS specified")
2847
2848     # check primary node
2849     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2850     if pnode is None:
2851       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2852                                  self.op.pnode)
2853     self.op.pnode = pnode.name
2854     self.pnode = pnode
2855     self.secondaries = []
2856     # disk template and mirror node verification
2857     if self.op.disk_template not in constants.DISK_TEMPLATES:
2858       raise errors.OpPrereqError("Invalid disk template name")
2859
2860     if self.op.disk_template in constants.DTS_NET_MIRROR:
2861       if getattr(self.op, "snode", None) is None:
2862         raise errors.OpPrereqError("The networked disk templates need"
2863                                    " a mirror node")
2864
2865       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2866       if snode_name is None:
2867         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2868                                    self.op.snode)
2869       elif snode_name == pnode.name:
2870         raise errors.OpPrereqError("The secondary node cannot be"
2871                                    " the primary node.")
2872       self.secondaries.append(snode_name)
2873
2874     # Check lv size requirements
2875     nodenames = [pnode.name] + self.secondaries
2876     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2877
2878     # Required free disk space as a function of disk and swap space
2879     req_size_dict = {
2880       constants.DT_DISKLESS: 0,
2881       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2882       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2883       # 256 MB are added for drbd metadata, 128MB for each drbd device
2884       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2885       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2886     }
2887
2888     if self.op.disk_template not in req_size_dict:
2889       raise errors.ProgrammerError("Disk template '%s' size requirement"
2890                                    " is unknown" %  self.op.disk_template)
2891
2892     req_size = req_size_dict[self.op.disk_template]
2893
2894     for node in nodenames:
2895       info = nodeinfo.get(node, None)
2896       if not info:
2897         raise errors.OpPrereqError("Cannot get current information"
2898                                    " from node '%s'" % nodeinfo)
2899       if req_size > info['vg_free']:
2900         raise errors.OpPrereqError("Not enough disk space on target node %s."
2901                                    " %d MB available, %d MB required" %
2902                                    (node, info['vg_free'], req_size))
2903
2904     # os verification
2905     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2906     if not os_obj:
2907       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2908                                  " primary node"  % self.op.os_type)
2909
2910     # instance verification
2911     hostname1 = utils.HostInfo(self.op.instance_name)
2912
2913     self.op.instance_name = instance_name = hostname1.name
2914     instance_list = self.cfg.GetInstanceList()
2915     if instance_name in instance_list:
2916       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2917                                  instance_name)
2918
2919     ip = getattr(self.op, "ip", None)
2920     if ip is None or ip.lower() == "none":
2921       inst_ip = None
2922     elif ip.lower() == "auto":
2923       inst_ip = hostname1.ip
2924     else:
2925       if not utils.IsValidIP(ip):
2926         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2927                                    " like a valid IP" % ip)
2928       inst_ip = ip
2929     self.inst_ip = inst_ip
2930
2931     if self.op.start and not self.op.ip_check:
2932       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2933                                  " adding an instance in start mode")
2934
2935     if self.op.ip_check:
2936       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2937                        constants.DEFAULT_NODED_PORT):
2938         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2939                                    (hostname1.ip, instance_name))
2940
2941     # bridge verification
2942     bridge = getattr(self.op, "bridge", None)
2943     if bridge is None:
2944       self.op.bridge = self.cfg.GetDefBridge()
2945     else:
2946       self.op.bridge = bridge
2947
2948     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2949       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2950                                  " destination node '%s'" %
2951                                  (self.op.bridge, pnode.name))
2952
2953     if self.op.start:
2954       self.instance_status = 'up'
2955     else:
2956       self.instance_status = 'down'
2957
2958   def Exec(self, feedback_fn):
2959     """Create and add the instance to the cluster.
2960
2961     """
2962     instance = self.op.instance_name
2963     pnode_name = self.pnode.name
2964
2965     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2966     if self.inst_ip is not None:
2967       nic.ip = self.inst_ip
2968
2969     disks = _GenerateDiskTemplate(self.cfg,
2970                                   self.op.disk_template,
2971                                   instance, pnode_name,
2972                                   self.secondaries, self.op.disk_size,
2973                                   self.op.swap_size)
2974
2975     iobj = objects.Instance(name=instance, os=self.op.os_type,
2976                             primary_node=pnode_name,
2977                             memory=self.op.mem_size,
2978                             vcpus=self.op.vcpus,
2979                             nics=[nic], disks=disks,
2980                             disk_template=self.op.disk_template,
2981                             status=self.instance_status,
2982                             )
2983
2984     feedback_fn("* creating instance disks...")
2985     if not _CreateDisks(self.cfg, iobj):
2986       _RemoveDisks(iobj, self.cfg)
2987       raise errors.OpExecError("Device creation failed, reverting...")
2988
2989     feedback_fn("adding instance %s to cluster config" % instance)
2990
2991     self.cfg.AddInstance(iobj)
2992
2993     if self.op.wait_for_sync:
2994       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
2995     elif iobj.disk_template in constants.DTS_NET_MIRROR:
2996       # make sure the disks are not degraded (still sync-ing is ok)
2997       time.sleep(15)
2998       feedback_fn("* checking mirrors status")
2999       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3000     else:
3001       disk_abort = False
3002
3003     if disk_abort:
3004       _RemoveDisks(iobj, self.cfg)
3005       self.cfg.RemoveInstance(iobj.name)
3006       raise errors.OpExecError("There are some degraded disks for"
3007                                " this instance")
3008
3009     feedback_fn("creating os for instance %s on node %s" %
3010                 (instance, pnode_name))
3011
3012     if iobj.disk_template != constants.DT_DISKLESS:
3013       if self.op.mode == constants.INSTANCE_CREATE:
3014         feedback_fn("* running the instance OS create scripts...")
3015         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3016           raise errors.OpExecError("could not add os for instance %s"
3017                                    " on node %s" %
3018                                    (instance, pnode_name))
3019
3020       elif self.op.mode == constants.INSTANCE_IMPORT:
3021         feedback_fn("* running the instance OS import scripts...")
3022         src_node = self.op.src_node
3023         src_image = self.src_image
3024         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3025                                                 src_node, src_image):
3026           raise errors.OpExecError("Could not import os for instance"
3027                                    " %s on node %s" %
3028                                    (instance, pnode_name))
3029       else:
3030         # also checked in the prereq part
3031         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3032                                      % self.op.mode)
3033
3034     if self.op.start:
3035       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3036       feedback_fn("* starting instance...")
3037       if not rpc.call_instance_start(pnode_name, iobj, None):
3038         raise errors.OpExecError("Could not start instance")
3039
3040
3041 class LUConnectConsole(NoHooksLU):
3042   """Connect to an instance's console.
3043
3044   This is somewhat special in that it returns the command line that
3045   you need to run on the master node in order to connect to the
3046   console.
3047
3048   """
3049   _OP_REQP = ["instance_name"]
3050
3051   def CheckPrereq(self):
3052     """Check prerequisites.
3053
3054     This checks that the instance is in the cluster.
3055
3056     """
3057     instance = self.cfg.GetInstanceInfo(
3058       self.cfg.ExpandInstanceName(self.op.instance_name))
3059     if instance is None:
3060       raise errors.OpPrereqError("Instance '%s' not known" %
3061                                  self.op.instance_name)
3062     self.instance = instance
3063
3064   def Exec(self, feedback_fn):
3065     """Connect to the console of an instance
3066
3067     """
3068     instance = self.instance
3069     node = instance.primary_node
3070
3071     node_insts = rpc.call_instance_list([node])[node]
3072     if node_insts is False:
3073       raise errors.OpExecError("Can't connect to node %s." % node)
3074
3075     if instance.name not in node_insts:
3076       raise errors.OpExecError("Instance %s is not running." % instance.name)
3077
3078     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3079
3080     hyper = hypervisor.GetHypervisor()
3081     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3082     # build ssh cmdline
3083     argv = ["ssh", "-q", "-t"]
3084     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3085     argv.extend(ssh.BATCH_MODE_OPTS)
3086     argv.append(node)
3087     argv.append(console_cmd)
3088     return "ssh", argv
3089
3090
3091 class LUAddMDDRBDComponent(LogicalUnit):
3092   """Adda new mirror member to an instance's disk.
3093
3094   """
3095   HPATH = "mirror-add"
3096   HTYPE = constants.HTYPE_INSTANCE
3097   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3098
3099   def BuildHooksEnv(self):
3100     """Build hooks env.
3101
3102     This runs on the master, the primary and all the secondaries.
3103
3104     """
3105     env = {
3106       "NEW_SECONDARY": self.op.remote_node,
3107       "DISK_NAME": self.op.disk_name,
3108       }
3109     env.update(_BuildInstanceHookEnvByObject(self.instance))
3110     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3111           self.op.remote_node,] + list(self.instance.secondary_nodes)
3112     return env, nl, nl
3113
3114   def CheckPrereq(self):
3115     """Check prerequisites.
3116
3117     This checks that the instance is in the cluster.
3118
3119     """
3120     instance = self.cfg.GetInstanceInfo(
3121       self.cfg.ExpandInstanceName(self.op.instance_name))
3122     if instance is None:
3123       raise errors.OpPrereqError("Instance '%s' not known" %
3124                                  self.op.instance_name)
3125     self.instance = instance
3126
3127     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3128     if remote_node is None:
3129       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3130     self.remote_node = remote_node
3131
3132     if remote_node == instance.primary_node:
3133       raise errors.OpPrereqError("The specified node is the primary node of"
3134                                  " the instance.")
3135
3136     if instance.disk_template != constants.DT_REMOTE_RAID1:
3137       raise errors.OpPrereqError("Instance's disk layout is not"
3138                                  " remote_raid1.")
3139     for disk in instance.disks:
3140       if disk.iv_name == self.op.disk_name:
3141         break
3142     else:
3143       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3144                                  " instance." % self.op.disk_name)
3145     if len(disk.children) > 1:
3146       raise errors.OpPrereqError("The device already has two slave"
3147                                  " devices.\n"
3148                                  "This would create a 3-disk raid1"
3149                                  " which we don't allow.")
3150     self.disk = disk
3151
3152   def Exec(self, feedback_fn):
3153     """Add the mirror component
3154
3155     """
3156     disk = self.disk
3157     instance = self.instance
3158
3159     remote_node = self.remote_node
3160     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3161     names = _GenerateUniqueNames(self.cfg, lv_names)
3162     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3163                                      remote_node, disk.size, names)
3164
3165     logger.Info("adding new mirror component on secondary")
3166     #HARDCODE
3167     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3168                                       new_drbd, False,
3169                                       _GetInstanceInfoText(instance)):
3170       raise errors.OpExecError("Failed to create new component on secondary"
3171                                " node %s" % remote_node)
3172
3173     logger.Info("adding new mirror component on primary")
3174     #HARDCODE
3175     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3176                                     instance, new_drbd,
3177                                     _GetInstanceInfoText(instance)):
3178       # remove secondary dev
3179       self.cfg.SetDiskID(new_drbd, remote_node)
3180       rpc.call_blockdev_remove(remote_node, new_drbd)
3181       raise errors.OpExecError("Failed to create volume on primary")
3182
3183     # the device exists now
3184     # call the primary node to add the mirror to md
3185     logger.Info("adding new mirror component to md")
3186     if not rpc.call_blockdev_addchildren(instance.primary_node,
3187                                          disk, [new_drbd]):
3188       logger.Error("Can't add mirror compoment to md!")
3189       self.cfg.SetDiskID(new_drbd, remote_node)
3190       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3191         logger.Error("Can't rollback on secondary")
3192       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3193       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3194         logger.Error("Can't rollback on primary")
3195       raise errors.OpExecError("Can't add mirror component to md array")
3196
3197     disk.children.append(new_drbd)
3198
3199     self.cfg.AddInstance(instance)
3200
3201     _WaitForSync(self.cfg, instance, self.proc)
3202
3203     return 0
3204
3205
3206 class LURemoveMDDRBDComponent(LogicalUnit):
3207   """Remove a component from a remote_raid1 disk.
3208
3209   """
3210   HPATH = "mirror-remove"
3211   HTYPE = constants.HTYPE_INSTANCE
3212   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3213
3214   def BuildHooksEnv(self):
3215     """Build hooks env.
3216
3217     This runs on the master, the primary and all the secondaries.
3218
3219     """
3220     env = {
3221       "DISK_NAME": self.op.disk_name,
3222       "DISK_ID": self.op.disk_id,
3223       "OLD_SECONDARY": self.old_secondary,
3224       }
3225     env.update(_BuildInstanceHookEnvByObject(self.instance))
3226     nl = [self.sstore.GetMasterNode(),
3227           self.instance.primary_node] + list(self.instance.secondary_nodes)
3228     return env, nl, nl
3229
3230   def CheckPrereq(self):
3231     """Check prerequisites.
3232
3233     This checks that the instance is in the cluster.
3234
3235     """
3236     instance = self.cfg.GetInstanceInfo(
3237       self.cfg.ExpandInstanceName(self.op.instance_name))
3238     if instance is None:
3239       raise errors.OpPrereqError("Instance '%s' not known" %
3240                                  self.op.instance_name)
3241     self.instance = instance
3242
3243     if instance.disk_template != constants.DT_REMOTE_RAID1:
3244       raise errors.OpPrereqError("Instance's disk layout is not"
3245                                  " remote_raid1.")
3246     for disk in instance.disks:
3247       if disk.iv_name == self.op.disk_name:
3248         break
3249     else:
3250       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3251                                  " instance." % self.op.disk_name)
3252     for child in disk.children:
3253       if (child.dev_type == constants.LD_DRBD7 and
3254           child.logical_id[2] == self.op.disk_id):
3255         break
3256     else:
3257       raise errors.OpPrereqError("Can't find the device with this port.")
3258
3259     if len(disk.children) < 2:
3260       raise errors.OpPrereqError("Cannot remove the last component from"
3261                                  " a mirror.")
3262     self.disk = disk
3263     self.child = child
3264     if self.child.logical_id[0] == instance.primary_node:
3265       oid = 1
3266     else:
3267       oid = 0
3268     self.old_secondary = self.child.logical_id[oid]
3269
3270   def Exec(self, feedback_fn):
3271     """Remove the mirror component
3272
3273     """
3274     instance = self.instance
3275     disk = self.disk
3276     child = self.child
3277     logger.Info("remove mirror component")
3278     self.cfg.SetDiskID(disk, instance.primary_node)
3279     if not rpc.call_blockdev_removechildren(instance.primary_node,
3280                                             disk, [child]):
3281       raise errors.OpExecError("Can't remove child from mirror.")
3282
3283     for node in child.logical_id[:2]:
3284       self.cfg.SetDiskID(child, node)
3285       if not rpc.call_blockdev_remove(node, child):
3286         logger.Error("Warning: failed to remove device from node %s,"
3287                      " continuing operation." % node)
3288
3289     disk.children.remove(child)
3290     self.cfg.AddInstance(instance)
3291
3292
3293 class LUReplaceDisks(LogicalUnit):
3294   """Replace the disks of an instance.
3295
3296   """
3297   HPATH = "mirrors-replace"
3298   HTYPE = constants.HTYPE_INSTANCE
3299   _OP_REQP = ["instance_name", "mode", "disks"]
3300
3301   def BuildHooksEnv(self):
3302     """Build hooks env.
3303
3304     This runs on the master, the primary and all the secondaries.
3305
3306     """
3307     env = {
3308       "MODE": self.op.mode,
3309       "NEW_SECONDARY": self.op.remote_node,
3310       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3311       }
3312     env.update(_BuildInstanceHookEnvByObject(self.instance))
3313     nl = [
3314       self.sstore.GetMasterNode(),
3315       self.instance.primary_node,
3316       ]
3317     if self.op.remote_node is not None:
3318       nl.append(self.op.remote_node)
3319     return env, nl, nl
3320
3321   def CheckPrereq(self):
3322     """Check prerequisites.
3323
3324     This checks that the instance is in the cluster.
3325
3326     """
3327     instance = self.cfg.GetInstanceInfo(
3328       self.cfg.ExpandInstanceName(self.op.instance_name))
3329     if instance is None:
3330       raise errors.OpPrereqError("Instance '%s' not known" %
3331                                  self.op.instance_name)
3332     self.instance = instance
3333     self.op.instance_name = instance.name
3334
3335     if instance.disk_template not in constants.DTS_NET_MIRROR:
3336       raise errors.OpPrereqError("Instance's disk layout is not"
3337                                  " network mirrored.")
3338
3339     if len(instance.secondary_nodes) != 1:
3340       raise errors.OpPrereqError("The instance has a strange layout,"
3341                                  " expected one secondary but found %d" %
3342                                  len(instance.secondary_nodes))
3343
3344     self.sec_node = instance.secondary_nodes[0]
3345
3346     remote_node = getattr(self.op, "remote_node", None)
3347     if remote_node is not None:
3348       remote_node = self.cfg.ExpandNodeName(remote_node)
3349       if remote_node is None:
3350         raise errors.OpPrereqError("Node '%s' not known" %
3351                                    self.op.remote_node)
3352       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3353     else:
3354       self.remote_node_info = None
3355     if remote_node == instance.primary_node:
3356       raise errors.OpPrereqError("The specified node is the primary node of"
3357                                  " the instance.")
3358     elif remote_node == self.sec_node:
3359       if self.op.mode == constants.REPLACE_DISK_SEC:
3360         # this is for DRBD8, where we can't execute the same mode of
3361         # replacement as for drbd7 (no different port allocated)
3362         raise errors.OpPrereqError("Same secondary given, cannot execute"
3363                                    " replacement")
3364       # the user gave the current secondary, switch to
3365       # 'no-replace-secondary' mode for drbd7
3366       remote_node = None
3367     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3368         self.op.mode != constants.REPLACE_DISK_ALL):
3369       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3370                                  " disks replacement, not individual ones")
3371     if instance.disk_template == constants.DT_DRBD8:
3372       if (self.op.mode == constants.REPLACE_DISK_ALL and
3373           remote_node is not None):
3374         # switch to replace secondary mode
3375         self.op.mode = constants.REPLACE_DISK_SEC
3376
3377       if self.op.mode == constants.REPLACE_DISK_ALL:
3378         raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3379                                    " secondary disk replacement, not"
3380                                    " both at once")
3381       elif self.op.mode == constants.REPLACE_DISK_PRI:
3382         if remote_node is not None:
3383           raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3384                                      " the secondary while doing a primary"
3385                                      " node disk replacement")
3386         self.tgt_node = instance.primary_node
3387         self.oth_node = instance.secondary_nodes[0]
3388       elif self.op.mode == constants.REPLACE_DISK_SEC:
3389         self.new_node = remote_node # this can be None, in which case
3390                                     # we don't change the secondary
3391         self.tgt_node = instance.secondary_nodes[0]
3392         self.oth_node = instance.primary_node
3393       else:
3394         raise errors.ProgrammerError("Unhandled disk replace mode")
3395
3396     for name in self.op.disks:
3397       if instance.FindDisk(name) is None:
3398         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3399                                    (name, instance.name))
3400     self.op.remote_node = remote_node
3401
3402   def _ExecRR1(self, feedback_fn):
3403     """Replace the disks of an instance.
3404
3405     """
3406     instance = self.instance
3407     iv_names = {}
3408     # start of work
3409     if self.op.remote_node is None:
3410       remote_node = self.sec_node
3411     else:
3412       remote_node = self.op.remote_node
3413     cfg = self.cfg
3414     for dev in instance.disks:
3415       size = dev.size
3416       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3417       names = _GenerateUniqueNames(cfg, lv_names)
3418       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3419                                        remote_node, size, names)
3420       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3421       logger.Info("adding new mirror component on secondary for %s" %
3422                   dev.iv_name)
3423       #HARDCODE
3424       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3425                                         new_drbd, False,
3426                                         _GetInstanceInfoText(instance)):
3427         raise errors.OpExecError("Failed to create new component on"
3428                                  " secondary node %s\n"
3429                                  "Full abort, cleanup manually!" %
3430                                  remote_node)
3431
3432       logger.Info("adding new mirror component on primary")
3433       #HARDCODE
3434       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3435                                       instance, new_drbd,
3436                                       _GetInstanceInfoText(instance)):
3437         # remove secondary dev
3438         cfg.SetDiskID(new_drbd, remote_node)
3439         rpc.call_blockdev_remove(remote_node, new_drbd)
3440         raise errors.OpExecError("Failed to create volume on primary!\n"
3441                                  "Full abort, cleanup manually!!")
3442
3443       # the device exists now
3444       # call the primary node to add the mirror to md
3445       logger.Info("adding new mirror component to md")
3446       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3447                                            [new_drbd]):
3448         logger.Error("Can't add mirror compoment to md!")
3449         cfg.SetDiskID(new_drbd, remote_node)
3450         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3451           logger.Error("Can't rollback on secondary")
3452         cfg.SetDiskID(new_drbd, instance.primary_node)
3453         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3454           logger.Error("Can't rollback on primary")
3455         raise errors.OpExecError("Full abort, cleanup manually!!")
3456
3457       dev.children.append(new_drbd)
3458       cfg.AddInstance(instance)
3459
3460     # this can fail as the old devices are degraded and _WaitForSync
3461     # does a combined result over all disks, so we don't check its
3462     # return value
3463     _WaitForSync(cfg, instance, self.proc, unlock=True)
3464
3465     # so check manually all the devices
3466     for name in iv_names:
3467       dev, child, new_drbd = iv_names[name]
3468       cfg.SetDiskID(dev, instance.primary_node)
3469       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3470       if is_degr:
3471         raise errors.OpExecError("MD device %s is degraded!" % name)
3472       cfg.SetDiskID(new_drbd, instance.primary_node)
3473       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3474       if is_degr:
3475         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3476
3477     for name in iv_names:
3478       dev, child, new_drbd = iv_names[name]
3479       logger.Info("remove mirror %s component" % name)
3480       cfg.SetDiskID(dev, instance.primary_node)
3481       if not rpc.call_blockdev_removechildren(instance.primary_node,
3482                                               dev, [child]):
3483         logger.Error("Can't remove child from mirror, aborting"
3484                      " *this device cleanup*.\nYou need to cleanup manually!!")
3485         continue
3486
3487       for node in child.logical_id[:2]:
3488         logger.Info("remove child device on %s" % node)
3489         cfg.SetDiskID(child, node)
3490         if not rpc.call_blockdev_remove(node, child):
3491           logger.Error("Warning: failed to remove device from node %s,"
3492                        " continuing operation." % node)
3493
3494       dev.children.remove(child)
3495
3496       cfg.AddInstance(instance)
3497
3498   def _ExecD8DiskOnly(self, feedback_fn):
3499     """Replace a disk on the primary or secondary for dbrd8.
3500
3501     The algorithm for replace is quite complicated:
3502       - for each disk to be replaced:
3503         - create new LVs on the target node with unique names
3504         - detach old LVs from the drbd device
3505         - rename old LVs to name_replaced.<time_t>
3506         - rename new LVs to old LVs
3507         - attach the new LVs (with the old names now) to the drbd device
3508       - wait for sync across all devices
3509       - for each modified disk:
3510         - remove old LVs (which have the name name_replaces.<time_t>)
3511
3512     Failures are not very well handled.
3513
3514     """
3515     steps_total = 6
3516     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3517     instance = self.instance
3518     iv_names = {}
3519     vgname = self.cfg.GetVGName()
3520     # start of work
3521     cfg = self.cfg
3522     tgt_node = self.tgt_node
3523     oth_node = self.oth_node
3524
3525     # Step: check device activation
3526     self.proc.LogStep(1, steps_total, "check device existence")
3527     info("checking volume groups")
3528     my_vg = cfg.GetVGName()
3529     results = rpc.call_vg_list([oth_node, tgt_node])
3530     if not results:
3531       raise errors.OpExecError("Can't list volume groups on the nodes")
3532     for node in oth_node, tgt_node:
3533       res = results.get(node, False)
3534       if not res or my_vg not in res:
3535         raise errors.OpExecError("Volume group '%s' not found on %s" %
3536                                  (my_vg, node))
3537     for dev in instance.disks:
3538       if not dev.iv_name in self.op.disks:
3539         continue
3540       for node in tgt_node, oth_node:
3541         info("checking %s on %s" % (dev.iv_name, node))
3542         cfg.SetDiskID(dev, node)
3543         if not rpc.call_blockdev_find(node, dev):
3544           raise errors.OpExecError("Can't find device %s on node %s" %
3545                                    (dev.iv_name, node))
3546
3547     # Step: check other node consistency
3548     self.proc.LogStep(2, steps_total, "check peer consistency")
3549     for dev in instance.disks:
3550       if not dev.iv_name in self.op.disks:
3551         continue
3552       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3553       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3554                                    oth_node==instance.primary_node):
3555         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3556                                  " to replace disks on this node (%s)" %
3557                                  (oth_node, tgt_node))
3558
3559     # Step: create new storage
3560     self.proc.LogStep(3, steps_total, "allocate new storage")
3561     for dev in instance.disks:
3562       if not dev.iv_name in self.op.disks:
3563         continue
3564       size = dev.size
3565       cfg.SetDiskID(dev, tgt_node)
3566       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3567       names = _GenerateUniqueNames(cfg, lv_names)
3568       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3569                              logical_id=(vgname, names[0]))
3570       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3571                              logical_id=(vgname, names[1]))
3572       new_lvs = [lv_data, lv_meta]
3573       old_lvs = dev.children
3574       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3575       info("creating new local storage on %s for %s" %
3576            (tgt_node, dev.iv_name))
3577       # since we *always* want to create this LV, we use the
3578       # _Create...OnPrimary (which forces the creation), even if we
3579       # are talking about the secondary node
3580       for new_lv in new_lvs:
3581         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3582                                         _GetInstanceInfoText(instance)):
3583           raise errors.OpExecError("Failed to create new LV named '%s' on"
3584                                    " node '%s'" %
3585                                    (new_lv.logical_id[1], tgt_node))
3586
3587     # Step: for each lv, detach+rename*2+attach
3588     self.proc.LogStep(4, steps_total, "change drbd configuration")
3589     for dev, old_lvs, new_lvs in iv_names.itervalues():
3590       info("detaching %s drbd from local storage" % dev.iv_name)
3591       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3592         raise errors.OpExecError("Can't detach drbd from local storage on node"
3593                                  " %s for device %s" % (tgt_node, dev.iv_name))
3594       #dev.children = []
3595       #cfg.Update(instance)
3596
3597       # ok, we created the new LVs, so now we know we have the needed
3598       # storage; as such, we proceed on the target node to rename
3599       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3600       # using the assumption than logical_id == physical_id (which in
3601       # turn is the unique_id on that node)
3602
3603       # FIXME(iustin): use a better name for the replaced LVs
3604       temp_suffix = int(time.time())
3605       ren_fn = lambda d, suff: (d.physical_id[0],
3606                                 d.physical_id[1] + "_replaced-%s" % suff)
3607       # build the rename list based on what LVs exist on the node
3608       rlist = []
3609       for to_ren in old_lvs:
3610         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3611         if find_res is not None: # device exists
3612           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3613
3614       info("renaming the old LVs on the target node")
3615       if not rpc.call_blockdev_rename(tgt_node, rlist):
3616         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3617       # now we rename the new LVs to the old LVs
3618       info("renaming the new LVs on the target node")
3619       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3620       if not rpc.call_blockdev_rename(tgt_node, rlist):
3621         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3622
3623       for old, new in zip(old_lvs, new_lvs):
3624         new.logical_id = old.logical_id
3625         cfg.SetDiskID(new, tgt_node)
3626
3627       for disk in old_lvs:
3628         disk.logical_id = ren_fn(disk, temp_suffix)
3629         cfg.SetDiskID(disk, tgt_node)
3630
3631       # now that the new lvs have the old name, we can add them to the device
3632       info("adding new mirror component on %s" % tgt_node)
3633       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3634         for new_lv in new_lvs:
3635           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3636             warning("Can't rollback device %s", "manually cleanup unused"
3637                     " logical volumes")
3638         raise errors.OpExecError("Can't add local storage to drbd")
3639
3640       dev.children = new_lvs
3641       cfg.Update(instance)
3642
3643     # Step: wait for sync
3644
3645     # this can fail as the old devices are degraded and _WaitForSync
3646     # does a combined result over all disks, so we don't check its
3647     # return value
3648     self.proc.LogStep(5, steps_total, "sync devices")
3649     _WaitForSync(cfg, instance, self.proc, unlock=True)
3650
3651     # so check manually all the devices
3652     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3653       cfg.SetDiskID(dev, instance.primary_node)
3654       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3655       if is_degr:
3656         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3657
3658     # Step: remove old storage
3659     self.proc.LogStep(6, steps_total, "removing old storage")
3660     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3661       info("remove logical volumes for %s" % name)
3662       for lv in old_lvs:
3663         cfg.SetDiskID(lv, tgt_node)
3664         if not rpc.call_blockdev_remove(tgt_node, lv):
3665           warning("Can't remove old LV", "manually remove unused LVs")
3666           continue
3667
3668   def _ExecD8Secondary(self, feedback_fn):
3669     """Replace the secondary node for drbd8.
3670
3671     The algorithm for replace is quite complicated:
3672       - for all disks of the instance:
3673         - create new LVs on the new node with same names
3674         - shutdown the drbd device on the old secondary
3675         - disconnect the drbd network on the primary
3676         - create the drbd device on the new secondary
3677         - network attach the drbd on the primary, using an artifice:
3678           the drbd code for Attach() will connect to the network if it
3679           finds a device which is connected to the good local disks but
3680           not network enabled
3681       - wait for sync across all devices
3682       - remove all disks from the old secondary
3683
3684     Failures are not very well handled.
3685
3686     """
3687     steps_total = 6
3688     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3689     instance = self.instance
3690     iv_names = {}
3691     vgname = self.cfg.GetVGName()
3692     # start of work
3693     cfg = self.cfg
3694     old_node = self.tgt_node
3695     new_node = self.new_node
3696     pri_node = instance.primary_node
3697
3698     # Step: check device activation
3699     self.proc.LogStep(1, steps_total, "check device existence")
3700     info("checking volume groups")
3701     my_vg = cfg.GetVGName()
3702     results = rpc.call_vg_list([pri_node, new_node])
3703     if not results:
3704       raise errors.OpExecError("Can't list volume groups on the nodes")
3705     for node in pri_node, new_node:
3706       res = results.get(node, False)
3707       if not res or my_vg not in res:
3708         raise errors.OpExecError("Volume group '%s' not found on %s" %
3709                                  (my_vg, node))
3710     for dev in instance.disks:
3711       if not dev.iv_name in self.op.disks:
3712         continue
3713       info("checking %s on %s" % (dev.iv_name, pri_node))
3714       cfg.SetDiskID(dev, pri_node)
3715       if not rpc.call_blockdev_find(pri_node, dev):
3716         raise errors.OpExecError("Can't find device %s on node %s" %
3717                                  (dev.iv_name, pri_node))
3718
3719     # Step: check other node consistency
3720     self.proc.LogStep(2, steps_total, "check peer consistency")
3721     for dev in instance.disks:
3722       if not dev.iv_name in self.op.disks:
3723         continue
3724       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3725       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3726         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3727                                  " unsafe to replace the secondary" %
3728                                  pri_node)
3729
3730     # Step: create new storage
3731     self.proc.LogStep(3, steps_total, "allocate new storage")
3732     for dev in instance.disks:
3733       size = dev.size
3734       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3735       # since we *always* want to create this LV, we use the
3736       # _Create...OnPrimary (which forces the creation), even if we
3737       # are talking about the secondary node
3738       for new_lv in dev.children:
3739         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3740                                         _GetInstanceInfoText(instance)):
3741           raise errors.OpExecError("Failed to create new LV named '%s' on"
3742                                    " node '%s'" %
3743                                    (new_lv.logical_id[1], new_node))
3744
3745       iv_names[dev.iv_name] = (dev, dev.children)
3746
3747     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3748     for dev in instance.disks:
3749       size = dev.size
3750       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3751       # create new devices on new_node
3752       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3753                               logical_id=(pri_node, new_node,
3754                                           dev.logical_id[2]),
3755                               children=dev.children)
3756       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3757                                         new_drbd, False,
3758                                       _GetInstanceInfoText(instance)):
3759         raise errors.OpExecError("Failed to create new DRBD on"
3760                                  " node '%s'" % new_node)
3761
3762     for dev in instance.disks:
3763       # we have new devices, shutdown the drbd on the old secondary
3764       info("shutting down drbd for %s on old node" % dev.iv_name)
3765       cfg.SetDiskID(dev, old_node)
3766       if not rpc.call_blockdev_shutdown(old_node, dev):
3767         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3768                 "Please cleanup this device manually as soon as possible")
3769
3770       # we have new storage, we 'rename' the network on the primary
3771       info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3772       cfg.SetDiskID(dev, pri_node)
3773       # rename to the ip of the new node
3774       new_uid = list(dev.physical_id)
3775       new_uid[2] = self.remote_node_info.secondary_ip
3776       rlist = [(dev, tuple(new_uid))]
3777       if not rpc.call_blockdev_rename(pri_node, rlist):
3778         raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3779                                  " %s from %s to %s" %
3780                                  (dev.iv_name, pri_node, old_node, new_node))
3781       dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3782       cfg.SetDiskID(dev, pri_node)
3783       cfg.Update(instance)
3784
3785
3786     # this can fail as the old devices are degraded and _WaitForSync
3787     # does a combined result over all disks, so we don't check its
3788     # return value
3789     self.proc.LogStep(5, steps_total, "sync devices")
3790     _WaitForSync(cfg, instance, self.proc, unlock=True)
3791
3792     # so check manually all the devices
3793     for name, (dev, old_lvs) in iv_names.iteritems():
3794       cfg.SetDiskID(dev, pri_node)
3795       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3796       if is_degr:
3797         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3798
3799     self.proc.LogStep(6, steps_total, "removing old storage")
3800     for name, (dev, old_lvs) in iv_names.iteritems():
3801       info("remove logical volumes for %s" % name)
3802       for lv in old_lvs:
3803         cfg.SetDiskID(lv, old_node)
3804         if not rpc.call_blockdev_remove(old_node, lv):
3805           warning("Can't remove LV on old secondary",
3806                   "Cleanup stale volumes by hand")
3807
3808   def Exec(self, feedback_fn):
3809     """Execute disk replacement.
3810
3811     This dispatches the disk replacement to the appropriate handler.
3812
3813     """
3814     instance = self.instance
3815     if instance.disk_template == constants.DT_REMOTE_RAID1:
3816       fn = self._ExecRR1
3817     elif instance.disk_template == constants.DT_DRBD8:
3818       if self.op.remote_node is None:
3819         fn = self._ExecD8DiskOnly
3820       else:
3821         fn = self._ExecD8Secondary
3822     else:
3823       raise errors.ProgrammerError("Unhandled disk replacement case")
3824     return fn(feedback_fn)
3825
3826
3827 class LUQueryInstanceData(NoHooksLU):
3828   """Query runtime instance data.
3829
3830   """
3831   _OP_REQP = ["instances"]
3832
3833   def CheckPrereq(self):
3834     """Check prerequisites.
3835
3836     This only checks the optional instance list against the existing names.
3837
3838     """
3839     if not isinstance(self.op.instances, list):
3840       raise errors.OpPrereqError("Invalid argument type 'instances'")
3841     if self.op.instances:
3842       self.wanted_instances = []
3843       names = self.op.instances
3844       for name in names:
3845         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3846         if instance is None:
3847           raise errors.OpPrereqError("No such instance name '%s'" % name)
3848       self.wanted_instances.append(instance)
3849     else:
3850       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3851                                in self.cfg.GetInstanceList()]
3852     return
3853
3854
3855   def _ComputeDiskStatus(self, instance, snode, dev):
3856     """Compute block device status.
3857
3858     """
3859     self.cfg.SetDiskID(dev, instance.primary_node)
3860     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3861     if dev.dev_type in constants.LDS_DRBD:
3862       # we change the snode then (otherwise we use the one passed in)
3863       if dev.logical_id[0] == instance.primary_node:
3864         snode = dev.logical_id[1]
3865       else:
3866         snode = dev.logical_id[0]
3867
3868     if snode:
3869       self.cfg.SetDiskID(dev, snode)
3870       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3871     else:
3872       dev_sstatus = None
3873
3874     if dev.children:
3875       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3876                       for child in dev.children]
3877     else:
3878       dev_children = []
3879
3880     data = {
3881       "iv_name": dev.iv_name,
3882       "dev_type": dev.dev_type,
3883       "logical_id": dev.logical_id,
3884       "physical_id": dev.physical_id,
3885       "pstatus": dev_pstatus,
3886       "sstatus": dev_sstatus,
3887       "children": dev_children,
3888       }
3889
3890     return data
3891
3892   def Exec(self, feedback_fn):
3893     """Gather and return data"""
3894     result = {}
3895     for instance in self.wanted_instances:
3896       remote_info = rpc.call_instance_info(instance.primary_node,
3897                                                 instance.name)
3898       if remote_info and "state" in remote_info:
3899         remote_state = "up"
3900       else:
3901         remote_state = "down"
3902       if instance.status == "down":
3903         config_state = "down"
3904       else:
3905         config_state = "up"
3906
3907       disks = [self._ComputeDiskStatus(instance, None, device)
3908                for device in instance.disks]
3909
3910       idict = {
3911         "name": instance.name,
3912         "config_state": config_state,
3913         "run_state": remote_state,
3914         "pnode": instance.primary_node,
3915         "snodes": instance.secondary_nodes,
3916         "os": instance.os,
3917         "memory": instance.memory,
3918         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3919         "disks": disks,
3920         "vcpus": instance.vcpus,
3921         }
3922
3923       result[instance.name] = idict
3924
3925     return result
3926
3927
3928 class LUSetInstanceParms(LogicalUnit):
3929   """Modifies an instances's parameters.
3930
3931   """
3932   HPATH = "instance-modify"
3933   HTYPE = constants.HTYPE_INSTANCE
3934   _OP_REQP = ["instance_name"]
3935
3936   def BuildHooksEnv(self):
3937     """Build hooks env.
3938
3939     This runs on the master, primary and secondaries.
3940
3941     """
3942     args = dict()
3943     if self.mem:
3944       args['memory'] = self.mem
3945     if self.vcpus:
3946       args['vcpus'] = self.vcpus
3947     if self.do_ip or self.do_bridge:
3948       if self.do_ip:
3949         ip = self.ip
3950       else:
3951         ip = self.instance.nics[0].ip
3952       if self.bridge:
3953         bridge = self.bridge
3954       else:
3955         bridge = self.instance.nics[0].bridge
3956       args['nics'] = [(ip, bridge)]
3957     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3958     nl = [self.sstore.GetMasterNode(),
3959           self.instance.primary_node] + list(self.instance.secondary_nodes)
3960     return env, nl, nl
3961
3962   def CheckPrereq(self):
3963     """Check prerequisites.
3964
3965     This only checks the instance list against the existing names.
3966
3967     """
3968     self.mem = getattr(self.op, "mem", None)
3969     self.vcpus = getattr(self.op, "vcpus", None)
3970     self.ip = getattr(self.op, "ip", None)
3971     self.bridge = getattr(self.op, "bridge", None)
3972     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3973       raise errors.OpPrereqError("No changes submitted")
3974     if self.mem is not None:
3975       try:
3976         self.mem = int(self.mem)
3977       except ValueError, err:
3978         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3979     if self.vcpus is not None:
3980       try:
3981         self.vcpus = int(self.vcpus)
3982       except ValueError, err:
3983         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3984     if self.ip is not None:
3985       self.do_ip = True
3986       if self.ip.lower() == "none":
3987         self.ip = None
3988       else:
3989         if not utils.IsValidIP(self.ip):
3990           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3991     else:
3992       self.do_ip = False
3993     self.do_bridge = (self.bridge is not None)
3994
3995     instance = self.cfg.GetInstanceInfo(
3996       self.cfg.ExpandInstanceName(self.op.instance_name))
3997     if instance is None:
3998       raise errors.OpPrereqError("No such instance name '%s'" %
3999                                  self.op.instance_name)
4000     self.op.instance_name = instance.name
4001     self.instance = instance
4002     return
4003
4004   def Exec(self, feedback_fn):
4005     """Modifies an instance.
4006
4007     All parameters take effect only at the next restart of the instance.
4008     """
4009     result = []
4010     instance = self.instance
4011     if self.mem:
4012       instance.memory = self.mem
4013       result.append(("mem", self.mem))
4014     if self.vcpus:
4015       instance.vcpus = self.vcpus
4016       result.append(("vcpus",  self.vcpus))
4017     if self.do_ip:
4018       instance.nics[0].ip = self.ip
4019       result.append(("ip", self.ip))
4020     if self.bridge:
4021       instance.nics[0].bridge = self.bridge
4022       result.append(("bridge", self.bridge))
4023
4024     self.cfg.AddInstance(instance)
4025
4026     return result
4027
4028
4029 class LUQueryExports(NoHooksLU):
4030   """Query the exports list
4031
4032   """
4033   _OP_REQP = []
4034
4035   def CheckPrereq(self):
4036     """Check that the nodelist contains only existing nodes.
4037
4038     """
4039     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4040
4041   def Exec(self, feedback_fn):
4042     """Compute the list of all the exported system images.
4043
4044     Returns:
4045       a dictionary with the structure node->(export-list)
4046       where export-list is a list of the instances exported on
4047       that node.
4048
4049     """
4050     return rpc.call_export_list(self.nodes)
4051
4052
4053 class LUExportInstance(LogicalUnit):
4054   """Export an instance to an image in the cluster.
4055
4056   """
4057   HPATH = "instance-export"
4058   HTYPE = constants.HTYPE_INSTANCE
4059   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4060
4061   def BuildHooksEnv(self):
4062     """Build hooks env.
4063
4064     This will run on the master, primary node and target node.
4065
4066     """
4067     env = {
4068       "EXPORT_NODE": self.op.target_node,
4069       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4070       }
4071     env.update(_BuildInstanceHookEnvByObject(self.instance))
4072     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4073           self.op.target_node]
4074     return env, nl, nl
4075
4076   def CheckPrereq(self):
4077     """Check prerequisites.
4078
4079     This checks that the instance name is a valid one.
4080
4081     """
4082     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4083     self.instance = self.cfg.GetInstanceInfo(instance_name)
4084     if self.instance is None:
4085       raise errors.OpPrereqError("Instance '%s' not found" %
4086                                  self.op.instance_name)
4087
4088     # node verification
4089     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4090     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4091
4092     if self.dst_node is None:
4093       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4094                                  self.op.target_node)
4095     self.op.target_node = self.dst_node.name
4096
4097   def Exec(self, feedback_fn):
4098     """Export an instance to an image in the cluster.
4099
4100     """
4101     instance = self.instance
4102     dst_node = self.dst_node
4103     src_node = instance.primary_node
4104     # shutdown the instance, unless requested not to do so
4105     if self.op.shutdown:
4106       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4107       self.proc.ChainOpCode(op)
4108
4109     vgname = self.cfg.GetVGName()
4110
4111     snap_disks = []
4112
4113     try:
4114       for disk in instance.disks:
4115         if disk.iv_name == "sda":
4116           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4117           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4118
4119           if not new_dev_name:
4120             logger.Error("could not snapshot block device %s on node %s" %
4121                          (disk.logical_id[1], src_node))
4122           else:
4123             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4124                                       logical_id=(vgname, new_dev_name),
4125                                       physical_id=(vgname, new_dev_name),
4126                                       iv_name=disk.iv_name)
4127             snap_disks.append(new_dev)
4128
4129     finally:
4130       if self.op.shutdown:
4131         op = opcodes.OpStartupInstance(instance_name=instance.name,
4132                                        force=False)
4133         self.proc.ChainOpCode(op)
4134
4135     # TODO: check for size
4136
4137     for dev in snap_disks:
4138       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4139                                            instance):
4140         logger.Error("could not export block device %s from node"
4141                      " %s to node %s" %
4142                      (dev.logical_id[1], src_node, dst_node.name))
4143       if not rpc.call_blockdev_remove(src_node, dev):
4144         logger.Error("could not remove snapshot block device %s from"
4145                      " node %s" % (dev.logical_id[1], src_node))
4146
4147     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4148       logger.Error("could not finalize export for instance %s on node %s" %
4149                    (instance.name, dst_node.name))
4150
4151     nodelist = self.cfg.GetNodeList()
4152     nodelist.remove(dst_node.name)
4153
4154     # on one-node clusters nodelist will be empty after the removal
4155     # if we proceed the backup would be removed because OpQueryExports
4156     # substitutes an empty list with the full cluster node list.
4157     if nodelist:
4158       op = opcodes.OpQueryExports(nodes=nodelist)
4159       exportlist = self.proc.ChainOpCode(op)
4160       for node in exportlist:
4161         if instance.name in exportlist[node]:
4162           if not rpc.call_export_remove(node, instance.name):
4163             logger.Error("could not remove older export for instance %s"
4164                          " on node %s" % (instance.name, node))
4165
4166
4167 class TagsLU(NoHooksLU):
4168   """Generic tags LU.
4169
4170   This is an abstract class which is the parent of all the other tags LUs.
4171
4172   """
4173   def CheckPrereq(self):
4174     """Check prerequisites.
4175
4176     """
4177     if self.op.kind == constants.TAG_CLUSTER:
4178       self.target = self.cfg.GetClusterInfo()
4179     elif self.op.kind == constants.TAG_NODE:
4180       name = self.cfg.ExpandNodeName(self.op.name)
4181       if name is None:
4182         raise errors.OpPrereqError("Invalid node name (%s)" %
4183                                    (self.op.name,))
4184       self.op.name = name
4185       self.target = self.cfg.GetNodeInfo(name)
4186     elif self.op.kind == constants.TAG_INSTANCE:
4187       name = self.cfg.ExpandInstanceName(self.op.name)
4188       if name is None:
4189         raise errors.OpPrereqError("Invalid instance name (%s)" %
4190                                    (self.op.name,))
4191       self.op.name = name
4192       self.target = self.cfg.GetInstanceInfo(name)
4193     else:
4194       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4195                                  str(self.op.kind))
4196
4197
4198 class LUGetTags(TagsLU):
4199   """Returns the tags of a given object.
4200
4201   """
4202   _OP_REQP = ["kind", "name"]
4203
4204   def Exec(self, feedback_fn):
4205     """Returns the tag list.
4206
4207     """
4208     return self.target.GetTags()
4209
4210
4211 class LUSearchTags(NoHooksLU):
4212   """Searches the tags for a given pattern.
4213
4214   """
4215   _OP_REQP = ["pattern"]
4216
4217   def CheckPrereq(self):
4218     """Check prerequisites.
4219
4220     This checks the pattern passed for validity by compiling it.
4221
4222     """
4223     try:
4224       self.re = re.compile(self.op.pattern)
4225     except re.error, err:
4226       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4227                                  (self.op.pattern, err))
4228
4229   def Exec(self, feedback_fn):
4230     """Returns the tag list.
4231
4232     """
4233     cfg = self.cfg
4234     tgts = [("/cluster", cfg.GetClusterInfo())]
4235     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4236     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4237     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4238     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4239     results = []
4240     for path, target in tgts:
4241       for tag in target.GetTags():
4242         if self.re.search(tag):
4243           results.append((path, tag))
4244     return results
4245
4246
4247 class LUAddTags(TagsLU):
4248   """Sets a tag on a given object.
4249
4250   """
4251   _OP_REQP = ["kind", "name", "tags"]
4252
4253   def CheckPrereq(self):
4254     """Check prerequisites.
4255
4256     This checks the type and length of the tag name and value.
4257
4258     """
4259     TagsLU.CheckPrereq(self)
4260     for tag in self.op.tags:
4261       objects.TaggableObject.ValidateTag(tag)
4262
4263   def Exec(self, feedback_fn):
4264     """Sets the tag.
4265
4266     """
4267     try:
4268       for tag in self.op.tags:
4269         self.target.AddTag(tag)
4270     except errors.TagError, err:
4271       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4272     try:
4273       self.cfg.Update(self.target)
4274     except errors.ConfigurationError:
4275       raise errors.OpRetryError("There has been a modification to the"
4276                                 " config file and the operation has been"
4277                                 " aborted. Please retry.")
4278
4279
4280 class LUDelTags(TagsLU):
4281   """Delete a list of tags from a given object.
4282
4283   """
4284   _OP_REQP = ["kind", "name", "tags"]
4285
4286   def CheckPrereq(self):
4287     """Check prerequisites.
4288
4289     This checks that we have the given tag.
4290
4291     """
4292     TagsLU.CheckPrereq(self)
4293     for tag in self.op.tags:
4294       objects.TaggableObject.ValidateTag(tag)
4295     del_tags = frozenset(self.op.tags)
4296     cur_tags = self.target.GetTags()
4297     if not del_tags <= cur_tags:
4298       diff_tags = del_tags - cur_tags
4299       diff_names = ["'%s'" % tag for tag in diff_tags]
4300       diff_names.sort()
4301       raise errors.OpPrereqError("Tag(s) %s not found" %
4302                                  (",".join(diff_names)))
4303
4304   def Exec(self, feedback_fn):
4305     """Remove the tag from the object.
4306
4307     """
4308     for tag in self.op.tags:
4309       self.target.RemoveTag(tag)
4310     try:
4311       self.cfg.Update(self.target)
4312     except errors.ConfigurationError:
4313       raise errors.OpRetryError("There has been a modification to the"
4314                                 " config file and the operation has been"
4315                                 " aborted. Please retry.")