Check whether init.d script is executable.
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.processor = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _GetWantedNodes(lu, nodes):
167   """Returns list of checked and expanded node names.
168
169   Args:
170     nodes: List of nodes (strings) or None for all
171
172   """
173   if not isinstance(nodes, list):
174     raise errors.OpPrereqError("Invalid argument type 'nodes'")
175
176   if nodes:
177     wanted = []
178
179     for name in nodes:
180       node = lu.cfg.ExpandNodeName(name)
181       if node is None:
182         raise errors.OpPrereqError("No such node name '%s'" % name)
183       wanted.append(node)
184
185   else:
186     wanted = lu.cfg.GetNodeList()
187   return utils.NiceSort(wanted)
188
189
190 def _GetWantedInstances(lu, instances):
191   """Returns list of checked and expanded instance names.
192
193   Args:
194     instances: List of instances (strings) or None for all
195
196   """
197   if not isinstance(instances, list):
198     raise errors.OpPrereqError("Invalid argument type 'instances'")
199
200   if instances:
201     wanted = []
202
203     for name in instances:
204       instance = lu.cfg.ExpandInstanceName(name)
205       if instance is None:
206         raise errors.OpPrereqError("No such instance name '%s'" % name)
207       wanted.append(instance)
208
209   else:
210     wanted = lu.cfg.GetInstanceList()
211   return utils.NiceSort(wanted)
212
213
214 def _CheckOutputFields(static, dynamic, selected):
215   """Checks whether all selected fields are valid.
216
217   Args:
218     static: Static fields
219     dynamic: Dynamic fields
220
221   """
222   static_fields = frozenset(static)
223   dynamic_fields = frozenset(dynamic)
224
225   all_fields = static_fields | dynamic_fields
226
227   if not all_fields.issuperset(selected):
228     raise errors.OpPrereqError("Unknown output fields selected: %s"
229                                % ",".join(frozenset(selected).
230                                           difference(all_fields)))
231
232
233 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
234                           memory, vcpus, nics):
235   """Builds instance related env variables for hooks from single variables.
236
237   Args:
238     secondary_nodes: List of secondary nodes as strings
239   """
240   env = {
241     "OP_TARGET": name,
242     "INSTANCE_NAME": name,
243     "INSTANCE_PRIMARY": primary_node,
244     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
245     "INSTANCE_OS_TYPE": os_type,
246     "INSTANCE_STATUS": status,
247     "INSTANCE_MEMORY": memory,
248     "INSTANCE_VCPUS": vcpus,
249   }
250
251   if nics:
252     nic_count = len(nics)
253     for idx, (ip, bridge) in enumerate(nics):
254       if ip is None:
255         ip = ""
256       env["INSTANCE_NIC%d_IP" % idx] = ip
257       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
258   else:
259     nic_count = 0
260
261   env["INSTANCE_NIC_COUNT"] = nic_count
262
263   return env
264
265
266 def _BuildInstanceHookEnvByObject(instance, override=None):
267   """Builds instance related env variables for hooks from an object.
268
269   Args:
270     instance: objects.Instance object of instance
271     override: dict of values to override
272   """
273   args = {
274     'name': instance.name,
275     'primary_node': instance.primary_node,
276     'secondary_nodes': instance.secondary_nodes,
277     'os_type': instance.os,
278     'status': instance.os,
279     'memory': instance.memory,
280     'vcpus': instance.vcpus,
281     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
282   }
283   if override:
284     args.update(override)
285   return _BuildInstanceHookEnv(**args)
286
287
288 def _UpdateEtcHosts(fullnode, ip):
289   """Ensure a node has a correct entry in /etc/hosts.
290
291   Args:
292     fullnode - Fully qualified domain name of host. (str)
293     ip       - IPv4 address of host (str)
294
295   """
296   node = fullnode.split(".", 1)[0]
297
298   f = open('/etc/hosts', 'r+')
299
300   inthere = False
301
302   save_lines = []
303   add_lines = []
304   removed = False
305
306   while True:
307     rawline = f.readline()
308
309     if not rawline:
310       # End of file
311       break
312
313     line = rawline.split('\n')[0]
314
315     # Strip off comments
316     line = line.split('#')[0]
317
318     if not line:
319       # Entire line was comment, skip
320       save_lines.append(rawline)
321       continue
322
323     fields = line.split()
324
325     haveall = True
326     havesome = False
327     for spec in [ ip, fullnode, node ]:
328       if spec not in fields:
329         haveall = False
330       if spec in fields:
331         havesome = True
332
333     if haveall:
334       inthere = True
335       save_lines.append(rawline)
336       continue
337
338     if havesome and not haveall:
339       # Line (old, or manual?) which is missing some.  Remove.
340       removed = True
341       continue
342
343     save_lines.append(rawline)
344
345   if not inthere:
346     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
347
348   if removed:
349     if add_lines:
350       save_lines = save_lines + add_lines
351
352     # We removed a line, write a new file and replace old.
353     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
354     newfile = os.fdopen(fd, 'w')
355     newfile.write(''.join(save_lines))
356     newfile.close()
357     os.rename(tmpname, '/etc/hosts')
358
359   elif add_lines:
360     # Simply appending a new line will do the trick.
361     f.seek(0, 2)
362     for add in add_lines:
363       f.write(add)
364
365   f.close()
366
367
368 def _UpdateKnownHosts(fullnode, ip, pubkey):
369   """Ensure a node has a correct known_hosts entry.
370
371   Args:
372     fullnode - Fully qualified domain name of host. (str)
373     ip       - IPv4 address of host (str)
374     pubkey   - the public key of the cluster
375
376   """
377   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
378     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
379   else:
380     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
381
382   inthere = False
383
384   save_lines = []
385   add_lines = []
386   removed = False
387
388   for rawline in f:
389     logger.Debug('read %s' % (repr(rawline),))
390
391     parts = rawline.rstrip('\r\n').split()
392
393     # Ignore unwanted lines
394     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
395       fields = parts[0].split(',')
396       key = parts[2]
397
398       haveall = True
399       havesome = False
400       for spec in [ ip, fullnode ]:
401         if spec not in fields:
402           haveall = False
403         if spec in fields:
404           havesome = True
405
406       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
407       if haveall and key == pubkey:
408         inthere = True
409         save_lines.append(rawline)
410         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
411         continue
412
413       if havesome and (not haveall or key != pubkey):
414         removed = True
415         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
416         continue
417
418     save_lines.append(rawline)
419
420   if not inthere:
421     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
422     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
423
424   if removed:
425     save_lines = save_lines + add_lines
426
427     # Write a new file and replace old.
428     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
429                                    constants.DATA_DIR)
430     newfile = os.fdopen(fd, 'w')
431     try:
432       newfile.write(''.join(save_lines))
433     finally:
434       newfile.close()
435     logger.Debug("Wrote new known_hosts.")
436     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
437
438   elif add_lines:
439     # Simply appending a new line will do the trick.
440     f.seek(0, 2)
441     for add in add_lines:
442       f.write(add)
443
444   f.close()
445
446
447 def _HasValidVG(vglist, vgname):
448   """Checks if the volume group list is valid.
449
450   A non-None return value means there's an error, and the return value
451   is the error message.
452
453   """
454   vgsize = vglist.get(vgname, None)
455   if vgsize is None:
456     return "volume group '%s' missing" % vgname
457   elif vgsize < 20480:
458     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
459             (vgname, vgsize))
460   return None
461
462
463 def _InitSSHSetup(node):
464   """Setup the SSH configuration for the cluster.
465
466
467   This generates a dsa keypair for root, adds the pub key to the
468   permitted hosts and adds the hostkey to its own known hosts.
469
470   Args:
471     node: the name of this host as a fqdn
472
473   """
474   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
475
476   for name in priv_key, pub_key:
477     if os.path.exists(name):
478       utils.CreateBackup(name)
479     utils.RemoveFile(name)
480
481   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
482                          "-f", priv_key,
483                          "-q", "-N", ""])
484   if result.failed:
485     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
486                              result.output)
487
488   f = open(pub_key, 'r')
489   try:
490     utils.AddAuthorizedKey(auth_keys, f.read(8192))
491   finally:
492     f.close()
493
494
495 def _InitGanetiServerSetup(ss):
496   """Setup the necessary configuration for the initial node daemon.
497
498   This creates the nodepass file containing the shared password for
499   the cluster and also generates the SSL certificate.
500
501   """
502   # Create pseudo random password
503   randpass = sha.new(os.urandom(64)).hexdigest()
504   # and write it into sstore
505   ss.SetKey(ss.SS_NODED_PASS, randpass)
506
507   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
508                          "-days", str(365*5), "-nodes", "-x509",
509                          "-keyout", constants.SSL_CERT_FILE,
510                          "-out", constants.SSL_CERT_FILE, "-batch"])
511   if result.failed:
512     raise errors.OpExecError("could not generate server ssl cert, command"
513                              " %s had exitcode %s and error message %s" %
514                              (result.cmd, result.exit_code, result.output))
515
516   os.chmod(constants.SSL_CERT_FILE, 0400)
517
518   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
519
520   if result.failed:
521     raise errors.OpExecError("Could not start the node daemon, command %s"
522                              " had exitcode %s and error %s" %
523                              (result.cmd, result.exit_code, result.output))
524
525
526 def _CheckInstanceBridgesExist(instance):
527   """Check that the brigdes needed by an instance exist.
528
529   """
530   # check bridges existance
531   brlist = [nic.bridge for nic in instance.nics]
532   if not rpc.call_bridges_exist(instance.primary_node, brlist):
533     raise errors.OpPrereqError("one or more target bridges %s does not"
534                                " exist on destination node '%s'" %
535                                (brlist, instance.primary_node))
536
537
538 class LUInitCluster(LogicalUnit):
539   """Initialise the cluster.
540
541   """
542   HPATH = "cluster-init"
543   HTYPE = constants.HTYPE_CLUSTER
544   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
545               "def_bridge", "master_netdev"]
546   REQ_CLUSTER = False
547
548   def BuildHooksEnv(self):
549     """Build hooks env.
550
551     Notes: Since we don't require a cluster, we must manually add
552     ourselves in the post-run node list.
553
554     """
555     env = {"OP_TARGET": self.op.cluster_name}
556     return env, [], [self.hostname.name]
557
558   def CheckPrereq(self):
559     """Verify that the passed name is a valid one.
560
561     """
562     if config.ConfigWriter.IsCluster():
563       raise errors.OpPrereqError("Cluster is already initialised")
564
565     self.hostname = hostname = utils.HostInfo()
566
567     if hostname.ip.startswith("127."):
568       raise errors.OpPrereqError("This host's IP resolves to the private"
569                                  " range (%s). Please fix DNS or /etc/hosts." %
570                                  (hostname.ip,))
571
572     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
573
574     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
575                          constants.DEFAULT_NODED_PORT):
576       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
577                                  " to %s,\nbut this ip address does not"
578                                  " belong to this host."
579                                  " Aborting." % hostname.ip)
580
581     secondary_ip = getattr(self.op, "secondary_ip", None)
582     if secondary_ip and not utils.IsValidIP(secondary_ip):
583       raise errors.OpPrereqError("Invalid secondary ip given")
584     if (secondary_ip and
585         secondary_ip != hostname.ip and
586         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
587                            constants.DEFAULT_NODED_PORT))):
588       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
589                                  "but it does not belong to this host." %
590                                  secondary_ip)
591     self.secondary_ip = secondary_ip
592
593     # checks presence of the volume group given
594     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
595
596     if vgstatus:
597       raise errors.OpPrereqError("Error: %s" % vgstatus)
598
599     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
600                     self.op.mac_prefix):
601       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
602                                  self.op.mac_prefix)
603
604     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
605       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
606                                  self.op.hypervisor_type)
607
608     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
609     if result.failed:
610       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
611                                  (self.op.master_netdev,
612                                   result.output.strip()))
613
614     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
615             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
616       raise errors.OpPrereqError("Init.d script '%s' missing or not "
617                                  "executable." % constants.NODE_INITD_SCRIPT)
618
619   def Exec(self, feedback_fn):
620     """Initialize the cluster.
621
622     """
623     clustername = self.clustername
624     hostname = self.hostname
625
626     # set up the simple store
627     self.sstore = ss = ssconf.SimpleStore()
628     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
629     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
630     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
631     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
632     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
633
634     # set up the inter-node password and certificate
635     _InitGanetiServerSetup(ss)
636
637     # start the master ip
638     rpc.call_node_start_master(hostname.name)
639
640     # set up ssh config and /etc/hosts
641     f = open(constants.SSH_HOST_RSA_PUB, 'r')
642     try:
643       sshline = f.read()
644     finally:
645       f.close()
646     sshkey = sshline.split(" ")[1]
647
648     _UpdateEtcHosts(hostname.name, hostname.ip)
649
650     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
651
652     _InitSSHSetup(hostname.name)
653
654     # init of cluster config file
655     self.cfg = cfgw = config.ConfigWriter()
656     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
657                     sshkey, self.op.mac_prefix,
658                     self.op.vg_name, self.op.def_bridge)
659
660
661 class LUDestroyCluster(NoHooksLU):
662   """Logical unit for destroying the cluster.
663
664   """
665   _OP_REQP = []
666
667   def CheckPrereq(self):
668     """Check prerequisites.
669
670     This checks whether the cluster is empty.
671
672     Any errors are signalled by raising errors.OpPrereqError.
673
674     """
675     master = self.sstore.GetMasterNode()
676
677     nodelist = self.cfg.GetNodeList()
678     if len(nodelist) != 1 or nodelist[0] != master:
679       raise errors.OpPrereqError("There are still %d node(s) in"
680                                  " this cluster." % (len(nodelist) - 1))
681     instancelist = self.cfg.GetInstanceList()
682     if instancelist:
683       raise errors.OpPrereqError("There are still %d instance(s) in"
684                                  " this cluster." % len(instancelist))
685
686   def Exec(self, feedback_fn):
687     """Destroys the cluster.
688
689     """
690     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
691     utils.CreateBackup(priv_key)
692     utils.CreateBackup(pub_key)
693     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
694
695
696 class LUVerifyCluster(NoHooksLU):
697   """Verifies the cluster status.
698
699   """
700   _OP_REQP = []
701
702   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
703                   remote_version, feedback_fn):
704     """Run multiple tests against a node.
705
706     Test list:
707       - compares ganeti version
708       - checks vg existance and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     Args:
713       node: name of the node to check
714       file_list: required list of files
715       local_cksum: dictionary of local files and their checksums
716
717     """
718     # compares ganeti version
719     local_version = constants.PROTOCOL_VERSION
720     if not remote_version:
721       feedback_fn(" - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version:
725       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
726                       (local_version, node, remote_version))
727       return True
728
729     # checks vg existance and size > 20G
730
731     bad = False
732     if not vglist:
733       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
734                       (node,))
735       bad = True
736     else:
737       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
738       if vgstatus:
739         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
740         bad = True
741
742     # checks config file checksum
743     # checks ssh to any
744
745     if 'filelist' not in node_result:
746       bad = True
747       feedback_fn("  - ERROR: node hasn't returned file checksum data")
748     else:
749       remote_cksum = node_result['filelist']
750       for file_name in file_list:
751         if file_name not in remote_cksum:
752           bad = True
753           feedback_fn("  - ERROR: file '%s' missing" % file_name)
754         elif remote_cksum[file_name] != local_cksum[file_name]:
755           bad = True
756           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
757
758     if 'nodelist' not in node_result:
759       bad = True
760       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
761     else:
762       if node_result['nodelist']:
763         bad = True
764         for node in node_result['nodelist']:
765           feedback_fn("  - ERROR: communication with node '%s': %s" %
766                           (node, node_result['nodelist'][node]))
767     hyp_result = node_result.get('hypervisor', None)
768     if hyp_result is not None:
769       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
770     return bad
771
772   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
773     """Verify an instance.
774
775     This function checks to see if the required block devices are
776     available on the instance's node.
777
778     """
779     bad = False
780
781     instancelist = self.cfg.GetInstanceList()
782     if not instance in instancelist:
783       feedback_fn("  - ERROR: instance %s not in instance list %s" %
784                       (instance, instancelist))
785       bad = True
786
787     instanceconfig = self.cfg.GetInstanceInfo(instance)
788     node_current = instanceconfig.primary_node
789
790     node_vol_should = {}
791     instanceconfig.MapLVsByNode(node_vol_should)
792
793     for node in node_vol_should:
794       for volume in node_vol_should[node]:
795         if node not in node_vol_is or volume not in node_vol_is[node]:
796           feedback_fn("  - ERROR: volume %s missing on node %s" %
797                           (volume, node))
798           bad = True
799
800     if not instanceconfig.status == 'down':
801       if not instance in node_instance[node_current]:
802         feedback_fn("  - ERROR: instance %s not running on node %s" %
803                         (instance, node_current))
804         bad = True
805
806     for node in node_instance:
807       if (not node == node_current):
808         if instance in node_instance[node]:
809           feedback_fn("  - ERROR: instance %s should not run on node %s" %
810                           (instance, node))
811           bad = True
812
813     return bad
814
815   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
816     """Verify if there are any unknown volumes in the cluster.
817
818     The .os, .swap and backup volumes are ignored. All other volumes are
819     reported as unknown.
820
821     """
822     bad = False
823
824     for node in node_vol_is:
825       for volume in node_vol_is[node]:
826         if node not in node_vol_should or volume not in node_vol_should[node]:
827           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
828                       (volume, node))
829           bad = True
830     return bad
831
832   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
833     """Verify the list of running instances.
834
835     This checks what instances are running but unknown to the cluster.
836
837     """
838     bad = False
839     for node in node_instance:
840       for runninginstance in node_instance[node]:
841         if runninginstance not in instancelist:
842           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
843                           (runninginstance, node))
844           bad = True
845     return bad
846
847   def CheckPrereq(self):
848     """Check prerequisites.
849
850     This has no prerequisites.
851
852     """
853     pass
854
855   def Exec(self, feedback_fn):
856     """Verify integrity of cluster, performing various test on nodes.
857
858     """
859     bad = False
860     feedback_fn("* Verifying global settings")
861     self.cfg.VerifyConfig()
862
863     vg_name = self.cfg.GetVGName()
864     nodelist = utils.NiceSort(self.cfg.GetNodeList())
865     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
866     node_volume = {}
867     node_instance = {}
868
869     # FIXME: verify OS list
870     # do local checksums
871     file_names = list(self.sstore.GetFileList())
872     file_names.append(constants.SSL_CERT_FILE)
873     file_names.append(constants.CLUSTER_CONF_FILE)
874     local_checksums = utils.FingerprintFiles(file_names)
875
876     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
877     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
878     all_instanceinfo = rpc.call_instance_list(nodelist)
879     all_vglist = rpc.call_vg_list(nodelist)
880     node_verify_param = {
881       'filelist': file_names,
882       'nodelist': nodelist,
883       'hypervisor': None,
884       }
885     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
886     all_rversion = rpc.call_version(nodelist)
887
888     for node in nodelist:
889       feedback_fn("* Verifying node %s" % node)
890       result = self._VerifyNode(node, file_names, local_checksums,
891                                 all_vglist[node], all_nvinfo[node],
892                                 all_rversion[node], feedback_fn)
893       bad = bad or result
894
895       # node_volume
896       volumeinfo = all_volumeinfo[node]
897
898       if type(volumeinfo) != dict:
899         feedback_fn("  - ERROR: connection to %s failed" % (node,))
900         bad = True
901         continue
902
903       node_volume[node] = volumeinfo
904
905       # node_instance
906       nodeinstance = all_instanceinfo[node]
907       if type(nodeinstance) != list:
908         feedback_fn("  - ERROR: connection to %s failed" % (node,))
909         bad = True
910         continue
911
912       node_instance[node] = nodeinstance
913
914     node_vol_should = {}
915
916     for instance in instancelist:
917       feedback_fn("* Verifying instance %s" % instance)
918       result =  self._VerifyInstance(instance, node_volume, node_instance,
919                                      feedback_fn)
920       bad = bad or result
921
922       inst_config = self.cfg.GetInstanceInfo(instance)
923
924       inst_config.MapLVsByNode(node_vol_should)
925
926     feedback_fn("* Verifying orphan volumes")
927     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
928                                        feedback_fn)
929     bad = bad or result
930
931     feedback_fn("* Verifying remaining instances")
932     result = self._VerifyOrphanInstances(instancelist, node_instance,
933                                          feedback_fn)
934     bad = bad or result
935
936     return int(bad)
937
938
939 class LURenameCluster(LogicalUnit):
940   """Rename the cluster.
941
942   """
943   HPATH = "cluster-rename"
944   HTYPE = constants.HTYPE_CLUSTER
945   _OP_REQP = ["name"]
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     """
951     env = {
952       "OP_TARGET": self.op.sstore.GetClusterName(),
953       "NEW_NAME": self.op.name,
954       }
955     mn = self.sstore.GetMasterNode()
956     return env, [mn], [mn]
957
958   def CheckPrereq(self):
959     """Verify that the passed name is a valid one.
960
961     """
962     hostname = utils.HostInfo(self.op.name)
963
964     new_name = hostname.name
965     self.ip = new_ip = hostname.ip
966     old_name = self.sstore.GetClusterName()
967     old_ip = self.sstore.GetMasterIP()
968     if new_name == old_name and new_ip == old_ip:
969       raise errors.OpPrereqError("Neither the name nor the IP address of the"
970                                  " cluster has changed")
971     if new_ip != old_ip:
972       result = utils.RunCmd(["fping", "-q", new_ip])
973       if not result.failed:
974         raise errors.OpPrereqError("The given cluster IP address (%s) is"
975                                    " reachable on the network. Aborting." %
976                                    new_ip)
977
978     self.op.name = new_name
979
980   def Exec(self, feedback_fn):
981     """Rename the cluster.
982
983     """
984     clustername = self.op.name
985     ip = self.ip
986     ss = self.sstore
987
988     # shutdown the master IP
989     master = ss.GetMasterNode()
990     if not rpc.call_node_stop_master(master):
991       raise errors.OpExecError("Could not disable the master role")
992
993     try:
994       # modify the sstore
995       ss.SetKey(ss.SS_MASTER_IP, ip)
996       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
997
998       # Distribute updated ss config to all nodes
999       myself = self.cfg.GetNodeInfo(master)
1000       dist_nodes = self.cfg.GetNodeList()
1001       if myself.name in dist_nodes:
1002         dist_nodes.remove(myself.name)
1003
1004       logger.Debug("Copying updated ssconf data to all nodes")
1005       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1006         fname = ss.KeyToFilename(keyname)
1007         result = rpc.call_upload_file(dist_nodes, fname)
1008         for to_node in dist_nodes:
1009           if not result[to_node]:
1010             logger.Error("copy of file %s to node %s failed" %
1011                          (fname, to_node))
1012     finally:
1013       if not rpc.call_node_start_master(master):
1014         logger.Error("Could not re-enable the master role on the master,\n"
1015                      "please restart manually.")
1016
1017
1018 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1019   """Sleep and poll for an instance's disk to sync.
1020
1021   """
1022   if not instance.disks:
1023     return True
1024
1025   if not oneshot:
1026     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1027
1028   node = instance.primary_node
1029
1030   for dev in instance.disks:
1031     cfgw.SetDiskID(dev, node)
1032
1033   retries = 0
1034   while True:
1035     max_time = 0
1036     done = True
1037     cumul_degraded = False
1038     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1039     if not rstats:
1040       logger.ToStderr("Can't get any data from node %s" % node)
1041       retries += 1
1042       if retries >= 10:
1043         raise errors.RemoteError("Can't contact node %s for mirror data,"
1044                                  " aborting." % node)
1045       time.sleep(6)
1046       continue
1047     retries = 0
1048     for i in range(len(rstats)):
1049       mstat = rstats[i]
1050       if mstat is None:
1051         logger.ToStderr("Can't compute data for node %s/%s" %
1052                         (node, instance.disks[i].iv_name))
1053         continue
1054       perc_done, est_time, is_degraded = mstat
1055       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1056       if perc_done is not None:
1057         done = False
1058         if est_time is not None:
1059           rem_time = "%d estimated seconds remaining" % est_time
1060           max_time = est_time
1061         else:
1062           rem_time = "no time estimate"
1063         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1064                         (instance.disks[i].iv_name, perc_done, rem_time))
1065     if done or oneshot:
1066       break
1067
1068     if unlock:
1069       utils.Unlock('cmd')
1070     try:
1071       time.sleep(min(60, max_time))
1072     finally:
1073       if unlock:
1074         utils.Lock('cmd')
1075
1076   if done:
1077     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1078   return not cumul_degraded
1079
1080
1081 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1082   """Check that mirrors are not degraded.
1083
1084   """
1085   cfgw.SetDiskID(dev, node)
1086
1087   result = True
1088   if on_primary or dev.AssembleOnSecondary():
1089     rstats = rpc.call_blockdev_find(node, dev)
1090     if not rstats:
1091       logger.ToStderr("Can't get any data from node %s" % node)
1092       result = False
1093     else:
1094       result = result and (not rstats[5])
1095   if dev.children:
1096     for child in dev.children:
1097       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1098
1099   return result
1100
1101
1102 class LUDiagnoseOS(NoHooksLU):
1103   """Logical unit for OS diagnose/query.
1104
1105   """
1106   _OP_REQP = []
1107
1108   def CheckPrereq(self):
1109     """Check prerequisites.
1110
1111     This always succeeds, since this is a pure query LU.
1112
1113     """
1114     return
1115
1116   def Exec(self, feedback_fn):
1117     """Compute the list of OSes.
1118
1119     """
1120     node_list = self.cfg.GetNodeList()
1121     node_data = rpc.call_os_diagnose(node_list)
1122     if node_data == False:
1123       raise errors.OpExecError("Can't gather the list of OSes")
1124     return node_data
1125
1126
1127 class LURemoveNode(LogicalUnit):
1128   """Logical unit for removing a node.
1129
1130   """
1131   HPATH = "node-remove"
1132   HTYPE = constants.HTYPE_NODE
1133   _OP_REQP = ["node_name"]
1134
1135   def BuildHooksEnv(self):
1136     """Build hooks env.
1137
1138     This doesn't run on the target node in the pre phase as a failed
1139     node would not allows itself to run.
1140
1141     """
1142     env = {
1143       "OP_TARGET": self.op.node_name,
1144       "NODE_NAME": self.op.node_name,
1145       }
1146     all_nodes = self.cfg.GetNodeList()
1147     all_nodes.remove(self.op.node_name)
1148     return env, all_nodes, all_nodes
1149
1150   def CheckPrereq(self):
1151     """Check prerequisites.
1152
1153     This checks:
1154      - the node exists in the configuration
1155      - it does not have primary or secondary instances
1156      - it's not the master
1157
1158     Any errors are signalled by raising errors.OpPrereqError.
1159
1160     """
1161     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1162     if node is None:
1163       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1164
1165     instance_list = self.cfg.GetInstanceList()
1166
1167     masternode = self.sstore.GetMasterNode()
1168     if node.name == masternode:
1169       raise errors.OpPrereqError("Node is the master node,"
1170                                  " you need to failover first.")
1171
1172     for instance_name in instance_list:
1173       instance = self.cfg.GetInstanceInfo(instance_name)
1174       if node.name == instance.primary_node:
1175         raise errors.OpPrereqError("Instance %s still running on the node,"
1176                                    " please remove first." % instance_name)
1177       if node.name in instance.secondary_nodes:
1178         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1179                                    " please remove first." % instance_name)
1180     self.op.node_name = node.name
1181     self.node = node
1182
1183   def Exec(self, feedback_fn):
1184     """Removes the node from the cluster.
1185
1186     """
1187     node = self.node
1188     logger.Info("stopping the node daemon and removing configs from node %s" %
1189                 node.name)
1190
1191     rpc.call_node_leave_cluster(node.name)
1192
1193     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1194
1195     logger.Info("Removing node %s from config" % node.name)
1196
1197     self.cfg.RemoveNode(node.name)
1198
1199
1200 class LUQueryNodes(NoHooksLU):
1201   """Logical unit for querying nodes.
1202
1203   """
1204   _OP_REQP = ["output_fields", "names"]
1205
1206   def CheckPrereq(self):
1207     """Check prerequisites.
1208
1209     This checks that the fields required are valid output fields.
1210
1211     """
1212     self.dynamic_fields = frozenset(["dtotal", "dfree",
1213                                      "mtotal", "mnode", "mfree",
1214                                      "bootid"])
1215
1216     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1217                                "pinst_list", "sinst_list",
1218                                "pip", "sip"],
1219                        dynamic=self.dynamic_fields,
1220                        selected=self.op.output_fields)
1221
1222     self.wanted = _GetWantedNodes(self, self.op.names)
1223
1224   def Exec(self, feedback_fn):
1225     """Computes the list of nodes and their attributes.
1226
1227     """
1228     nodenames = self.wanted
1229     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1230
1231     # begin data gathering
1232
1233     if self.dynamic_fields.intersection(self.op.output_fields):
1234       live_data = {}
1235       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1236       for name in nodenames:
1237         nodeinfo = node_data.get(name, None)
1238         if nodeinfo:
1239           live_data[name] = {
1240             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1241             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1242             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1243             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1244             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1245             "bootid": nodeinfo['bootid'],
1246             }
1247         else:
1248           live_data[name] = {}
1249     else:
1250       live_data = dict.fromkeys(nodenames, {})
1251
1252     node_to_primary = dict([(name, set()) for name in nodenames])
1253     node_to_secondary = dict([(name, set()) for name in nodenames])
1254
1255     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1256                              "sinst_cnt", "sinst_list"))
1257     if inst_fields & frozenset(self.op.output_fields):
1258       instancelist = self.cfg.GetInstanceList()
1259
1260       for instance_name in instancelist:
1261         inst = self.cfg.GetInstanceInfo(instance_name)
1262         if inst.primary_node in node_to_primary:
1263           node_to_primary[inst.primary_node].add(inst.name)
1264         for secnode in inst.secondary_nodes:
1265           if secnode in node_to_secondary:
1266             node_to_secondary[secnode].add(inst.name)
1267
1268     # end data gathering
1269
1270     output = []
1271     for node in nodelist:
1272       node_output = []
1273       for field in self.op.output_fields:
1274         if field == "name":
1275           val = node.name
1276         elif field == "pinst_list":
1277           val = list(node_to_primary[node.name])
1278         elif field == "sinst_list":
1279           val = list(node_to_secondary[node.name])
1280         elif field == "pinst_cnt":
1281           val = len(node_to_primary[node.name])
1282         elif field == "sinst_cnt":
1283           val = len(node_to_secondary[node.name])
1284         elif field == "pip":
1285           val = node.primary_ip
1286         elif field == "sip":
1287           val = node.secondary_ip
1288         elif field in self.dynamic_fields:
1289           val = live_data[node.name].get(field, None)
1290         else:
1291           raise errors.ParameterError(field)
1292         node_output.append(val)
1293       output.append(node_output)
1294
1295     return output
1296
1297
1298 class LUQueryNodeVolumes(NoHooksLU):
1299   """Logical unit for getting volumes on node(s).
1300
1301   """
1302   _OP_REQP = ["nodes", "output_fields"]
1303
1304   def CheckPrereq(self):
1305     """Check prerequisites.
1306
1307     This checks that the fields required are valid output fields.
1308
1309     """
1310     self.nodes = _GetWantedNodes(self, self.op.nodes)
1311
1312     _CheckOutputFields(static=["node"],
1313                        dynamic=["phys", "vg", "name", "size", "instance"],
1314                        selected=self.op.output_fields)
1315
1316
1317   def Exec(self, feedback_fn):
1318     """Computes the list of nodes and their attributes.
1319
1320     """
1321     nodenames = self.nodes
1322     volumes = rpc.call_node_volumes(nodenames)
1323
1324     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1325              in self.cfg.GetInstanceList()]
1326
1327     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1328
1329     output = []
1330     for node in nodenames:
1331       if node not in volumes or not volumes[node]:
1332         continue
1333
1334       node_vols = volumes[node][:]
1335       node_vols.sort(key=lambda vol: vol['dev'])
1336
1337       for vol in node_vols:
1338         node_output = []
1339         for field in self.op.output_fields:
1340           if field == "node":
1341             val = node
1342           elif field == "phys":
1343             val = vol['dev']
1344           elif field == "vg":
1345             val = vol['vg']
1346           elif field == "name":
1347             val = vol['name']
1348           elif field == "size":
1349             val = int(float(vol['size']))
1350           elif field == "instance":
1351             for inst in ilist:
1352               if node not in lv_by_node[inst]:
1353                 continue
1354               if vol['name'] in lv_by_node[inst][node]:
1355                 val = inst.name
1356                 break
1357             else:
1358               val = '-'
1359           else:
1360             raise errors.ParameterError(field)
1361           node_output.append(str(val))
1362
1363         output.append(node_output)
1364
1365     return output
1366
1367
1368 class LUAddNode(LogicalUnit):
1369   """Logical unit for adding node to the cluster.
1370
1371   """
1372   HPATH = "node-add"
1373   HTYPE = constants.HTYPE_NODE
1374   _OP_REQP = ["node_name"]
1375
1376   def BuildHooksEnv(self):
1377     """Build hooks env.
1378
1379     This will run on all nodes before, and on all nodes + the new node after.
1380
1381     """
1382     env = {
1383       "OP_TARGET": self.op.node_name,
1384       "NODE_NAME": self.op.node_name,
1385       "NODE_PIP": self.op.primary_ip,
1386       "NODE_SIP": self.op.secondary_ip,
1387       }
1388     nodes_0 = self.cfg.GetNodeList()
1389     nodes_1 = nodes_0 + [self.op.node_name, ]
1390     return env, nodes_0, nodes_1
1391
1392   def CheckPrereq(self):
1393     """Check prerequisites.
1394
1395     This checks:
1396      - the new node is not already in the config
1397      - it is resolvable
1398      - its parameters (single/dual homed) matches the cluster
1399
1400     Any errors are signalled by raising errors.OpPrereqError.
1401
1402     """
1403     node_name = self.op.node_name
1404     cfg = self.cfg
1405
1406     dns_data = utils.HostInfo(node_name)
1407
1408     node = dns_data.name
1409     primary_ip = self.op.primary_ip = dns_data.ip
1410     secondary_ip = getattr(self.op, "secondary_ip", None)
1411     if secondary_ip is None:
1412       secondary_ip = primary_ip
1413     if not utils.IsValidIP(secondary_ip):
1414       raise errors.OpPrereqError("Invalid secondary IP given")
1415     self.op.secondary_ip = secondary_ip
1416     node_list = cfg.GetNodeList()
1417     if node in node_list:
1418       raise errors.OpPrereqError("Node %s is already in the configuration"
1419                                  % node)
1420
1421     for existing_node_name in node_list:
1422       existing_node = cfg.GetNodeInfo(existing_node_name)
1423       if (existing_node.primary_ip == primary_ip or
1424           existing_node.secondary_ip == primary_ip or
1425           existing_node.primary_ip == secondary_ip or
1426           existing_node.secondary_ip == secondary_ip):
1427         raise errors.OpPrereqError("New node ip address(es) conflict with"
1428                                    " existing node %s" % existing_node.name)
1429
1430     # check that the type of the node (single versus dual homed) is the
1431     # same as for the master
1432     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1433     master_singlehomed = myself.secondary_ip == myself.primary_ip
1434     newbie_singlehomed = secondary_ip == primary_ip
1435     if master_singlehomed != newbie_singlehomed:
1436       if master_singlehomed:
1437         raise errors.OpPrereqError("The master has no private ip but the"
1438                                    " new node has one")
1439       else:
1440         raise errors.OpPrereqError("The master has a private ip but the"
1441                                    " new node doesn't have one")
1442
1443     # checks reachablity
1444     if not utils.TcpPing(utils.HostInfo().name,
1445                          primary_ip,
1446                          constants.DEFAULT_NODED_PORT):
1447       raise errors.OpPrereqError("Node not reachable by ping")
1448
1449     if not newbie_singlehomed:
1450       # check reachability from my secondary ip to newbie's secondary ip
1451       if not utils.TcpPing(myself.secondary_ip,
1452                            secondary_ip,
1453                            constants.DEFAULT_NODED_PORT):
1454         raise errors.OpPrereqError(
1455           "Node secondary ip not reachable by TCP based ping to noded port")
1456
1457     self.new_node = objects.Node(name=node,
1458                                  primary_ip=primary_ip,
1459                                  secondary_ip=secondary_ip)
1460
1461   def Exec(self, feedback_fn):
1462     """Adds the new node to the cluster.
1463
1464     """
1465     new_node = self.new_node
1466     node = new_node.name
1467
1468     # set up inter-node password and certificate and restarts the node daemon
1469     gntpass = self.sstore.GetNodeDaemonPassword()
1470     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1471       raise errors.OpExecError("ganeti password corruption detected")
1472     f = open(constants.SSL_CERT_FILE)
1473     try:
1474       gntpem = f.read(8192)
1475     finally:
1476       f.close()
1477     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1478     # so we use this to detect an invalid certificate; as long as the
1479     # cert doesn't contain this, the here-document will be correctly
1480     # parsed by the shell sequence below
1481     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1482       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1483     if not gntpem.endswith("\n"):
1484       raise errors.OpExecError("PEM must end with newline")
1485     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1486
1487     # and then connect with ssh to set password and start ganeti-noded
1488     # note that all the below variables are sanitized at this point,
1489     # either by being constants or by the checks above
1490     ss = self.sstore
1491     mycommand = ("umask 077 && "
1492                  "echo '%s' > '%s' && "
1493                  "cat > '%s' << '!EOF.' && \n"
1494                  "%s!EOF.\n%s restart" %
1495                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1496                   constants.SSL_CERT_FILE, gntpem,
1497                   constants.NODE_INITD_SCRIPT))
1498
1499     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1500     if result.failed:
1501       raise errors.OpExecError("Remote command on node %s, error: %s,"
1502                                " output: %s" %
1503                                (node, result.fail_reason, result.output))
1504
1505     # check connectivity
1506     time.sleep(4)
1507
1508     result = rpc.call_version([node])[node]
1509     if result:
1510       if constants.PROTOCOL_VERSION == result:
1511         logger.Info("communication to node %s fine, sw version %s match" %
1512                     (node, result))
1513       else:
1514         raise errors.OpExecError("Version mismatch master version %s,"
1515                                  " node version %s" %
1516                                  (constants.PROTOCOL_VERSION, result))
1517     else:
1518       raise errors.OpExecError("Cannot get version from the new node")
1519
1520     # setup ssh on node
1521     logger.Info("copy ssh key to node %s" % node)
1522     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1523     keyarray = []
1524     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1525                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1526                 priv_key, pub_key]
1527
1528     for i in keyfiles:
1529       f = open(i, 'r')
1530       try:
1531         keyarray.append(f.read())
1532       finally:
1533         f.close()
1534
1535     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1536                                keyarray[3], keyarray[4], keyarray[5])
1537
1538     if not result:
1539       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1540
1541     # Add node to our /etc/hosts, and add key to known_hosts
1542     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1543     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1544                       self.cfg.GetHostKey())
1545
1546     if new_node.secondary_ip != new_node.primary_ip:
1547       if not rpc.call_node_tcp_ping(new_node.name,
1548                                     constants.LOCALHOST_IP_ADDRESS,
1549                                     new_node.secondary_ip,
1550                                     constants.DEFAULT_NODED_PORT,
1551                                     10, False):
1552         raise errors.OpExecError("Node claims it doesn't have the"
1553                                  " secondary ip you gave (%s).\n"
1554                                  "Please fix and re-run this command." %
1555                                  new_node.secondary_ip)
1556
1557     success, msg = ssh.VerifyNodeHostname(node)
1558     if not success:
1559       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1560                                " than the one the resolver gives: %s.\n"
1561                                "Please fix and re-run this command." %
1562                                (node, msg))
1563
1564     # Distribute updated /etc/hosts and known_hosts to all nodes,
1565     # including the node just added
1566     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1567     dist_nodes = self.cfg.GetNodeList() + [node]
1568     if myself.name in dist_nodes:
1569       dist_nodes.remove(myself.name)
1570
1571     logger.Debug("Copying hosts and known_hosts to all nodes")
1572     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1573       result = rpc.call_upload_file(dist_nodes, fname)
1574       for to_node in dist_nodes:
1575         if not result[to_node]:
1576           logger.Error("copy of file %s to node %s failed" %
1577                        (fname, to_node))
1578
1579     to_copy = ss.GetFileList()
1580     for fname in to_copy:
1581       if not ssh.CopyFileToNode(node, fname):
1582         logger.Error("could not copy file %s to node %s" % (fname, node))
1583
1584     logger.Info("adding node %s to cluster.conf" % node)
1585     self.cfg.AddNode(new_node)
1586
1587
1588 class LUMasterFailover(LogicalUnit):
1589   """Failover the master node to the current node.
1590
1591   This is a special LU in that it must run on a non-master node.
1592
1593   """
1594   HPATH = "master-failover"
1595   HTYPE = constants.HTYPE_CLUSTER
1596   REQ_MASTER = False
1597   _OP_REQP = []
1598
1599   def BuildHooksEnv(self):
1600     """Build hooks env.
1601
1602     This will run on the new master only in the pre phase, and on all
1603     the nodes in the post phase.
1604
1605     """
1606     env = {
1607       "OP_TARGET": self.new_master,
1608       "NEW_MASTER": self.new_master,
1609       "OLD_MASTER": self.old_master,
1610       }
1611     return env, [self.new_master], self.cfg.GetNodeList()
1612
1613   def CheckPrereq(self):
1614     """Check prerequisites.
1615
1616     This checks that we are not already the master.
1617
1618     """
1619     self.new_master = utils.HostInfo().name
1620     self.old_master = self.sstore.GetMasterNode()
1621
1622     if self.old_master == self.new_master:
1623       raise errors.OpPrereqError("This commands must be run on the node"
1624                                  " where you want the new master to be.\n"
1625                                  "%s is already the master" %
1626                                  self.old_master)
1627
1628   def Exec(self, feedback_fn):
1629     """Failover the master node.
1630
1631     This command, when run on a non-master node, will cause the current
1632     master to cease being master, and the non-master to become new
1633     master.
1634
1635     """
1636     #TODO: do not rely on gethostname returning the FQDN
1637     logger.Info("setting master to %s, old master: %s" %
1638                 (self.new_master, self.old_master))
1639
1640     if not rpc.call_node_stop_master(self.old_master):
1641       logger.Error("could disable the master role on the old master"
1642                    " %s, please disable manually" % self.old_master)
1643
1644     ss = self.sstore
1645     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1646     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1647                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1648       logger.Error("could not distribute the new simple store master file"
1649                    " to the other nodes, please check.")
1650
1651     if not rpc.call_node_start_master(self.new_master):
1652       logger.Error("could not start the master role on the new master"
1653                    " %s, please check" % self.new_master)
1654       feedback_fn("Error in activating the master IP on the new master,\n"
1655                   "please fix manually.")
1656
1657
1658
1659 class LUQueryClusterInfo(NoHooksLU):
1660   """Query cluster configuration.
1661
1662   """
1663   _OP_REQP = []
1664   REQ_MASTER = False
1665
1666   def CheckPrereq(self):
1667     """No prerequsites needed for this LU.
1668
1669     """
1670     pass
1671
1672   def Exec(self, feedback_fn):
1673     """Return cluster config.
1674
1675     """
1676     result = {
1677       "name": self.sstore.GetClusterName(),
1678       "software_version": constants.RELEASE_VERSION,
1679       "protocol_version": constants.PROTOCOL_VERSION,
1680       "config_version": constants.CONFIG_VERSION,
1681       "os_api_version": constants.OS_API_VERSION,
1682       "export_version": constants.EXPORT_VERSION,
1683       "master": self.sstore.GetMasterNode(),
1684       "architecture": (platform.architecture()[0], platform.machine()),
1685       }
1686
1687     return result
1688
1689
1690 class LUClusterCopyFile(NoHooksLU):
1691   """Copy file to cluster.
1692
1693   """
1694   _OP_REQP = ["nodes", "filename"]
1695
1696   def CheckPrereq(self):
1697     """Check prerequisites.
1698
1699     It should check that the named file exists and that the given list
1700     of nodes is valid.
1701
1702     """
1703     if not os.path.exists(self.op.filename):
1704       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1705
1706     self.nodes = _GetWantedNodes(self, self.op.nodes)
1707
1708   def Exec(self, feedback_fn):
1709     """Copy a file from master to some nodes.
1710
1711     Args:
1712       opts - class with options as members
1713       args - list containing a single element, the file name
1714     Opts used:
1715       nodes - list containing the name of target nodes; if empty, all nodes
1716
1717     """
1718     filename = self.op.filename
1719
1720     myname = utils.HostInfo().name
1721
1722     for node in self.nodes:
1723       if node == myname:
1724         continue
1725       if not ssh.CopyFileToNode(node, filename):
1726         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1727
1728
1729 class LUDumpClusterConfig(NoHooksLU):
1730   """Return a text-representation of the cluster-config.
1731
1732   """
1733   _OP_REQP = []
1734
1735   def CheckPrereq(self):
1736     """No prerequisites.
1737
1738     """
1739     pass
1740
1741   def Exec(self, feedback_fn):
1742     """Dump a representation of the cluster config to the standard output.
1743
1744     """
1745     return self.cfg.DumpConfig()
1746
1747
1748 class LURunClusterCommand(NoHooksLU):
1749   """Run a command on some nodes.
1750
1751   """
1752   _OP_REQP = ["command", "nodes"]
1753
1754   def CheckPrereq(self):
1755     """Check prerequisites.
1756
1757     It checks that the given list of nodes is valid.
1758
1759     """
1760     self.nodes = _GetWantedNodes(self, self.op.nodes)
1761
1762   def Exec(self, feedback_fn):
1763     """Run a command on some nodes.
1764
1765     """
1766     data = []
1767     for node in self.nodes:
1768       result = ssh.SSHCall(node, "root", self.op.command)
1769       data.append((node, result.output, result.exit_code))
1770
1771     return data
1772
1773
1774 class LUActivateInstanceDisks(NoHooksLU):
1775   """Bring up an instance's disks.
1776
1777   """
1778   _OP_REQP = ["instance_name"]
1779
1780   def CheckPrereq(self):
1781     """Check prerequisites.
1782
1783     This checks that the instance is in the cluster.
1784
1785     """
1786     instance = self.cfg.GetInstanceInfo(
1787       self.cfg.ExpandInstanceName(self.op.instance_name))
1788     if instance is None:
1789       raise errors.OpPrereqError("Instance '%s' not known" %
1790                                  self.op.instance_name)
1791     self.instance = instance
1792
1793
1794   def Exec(self, feedback_fn):
1795     """Activate the disks.
1796
1797     """
1798     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1799     if not disks_ok:
1800       raise errors.OpExecError("Cannot activate block devices")
1801
1802     return disks_info
1803
1804
1805 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1806   """Prepare the block devices for an instance.
1807
1808   This sets up the block devices on all nodes.
1809
1810   Args:
1811     instance: a ganeti.objects.Instance object
1812     ignore_secondaries: if true, errors on secondary nodes won't result
1813                         in an error return from the function
1814
1815   Returns:
1816     false if the operation failed
1817     list of (host, instance_visible_name, node_visible_name) if the operation
1818          suceeded with the mapping from node devices to instance devices
1819   """
1820   device_info = []
1821   disks_ok = True
1822   for inst_disk in instance.disks:
1823     master_result = None
1824     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1825       cfg.SetDiskID(node_disk, node)
1826       is_primary = node == instance.primary_node
1827       result = rpc.call_blockdev_assemble(node, node_disk,
1828                                           instance.name, is_primary)
1829       if not result:
1830         logger.Error("could not prepare block device %s on node %s (is_pri"
1831                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1832         if is_primary or not ignore_secondaries:
1833           disks_ok = False
1834       if is_primary:
1835         master_result = result
1836     device_info.append((instance.primary_node, inst_disk.iv_name,
1837                         master_result))
1838
1839   # leave the disks configured for the primary node
1840   # this is a workaround that would be fixed better by
1841   # improving the logical/physical id handling
1842   for disk in instance.disks:
1843     cfg.SetDiskID(disk, instance.primary_node)
1844
1845   return disks_ok, device_info
1846
1847
1848 def _StartInstanceDisks(cfg, instance, force):
1849   """Start the disks of an instance.
1850
1851   """
1852   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1853                                            ignore_secondaries=force)
1854   if not disks_ok:
1855     _ShutdownInstanceDisks(instance, cfg)
1856     if force is not None and not force:
1857       logger.Error("If the message above refers to a secondary node,"
1858                    " you can retry the operation using '--force'.")
1859     raise errors.OpExecError("Disk consistency error")
1860
1861
1862 class LUDeactivateInstanceDisks(NoHooksLU):
1863   """Shutdown an instance's disks.
1864
1865   """
1866   _OP_REQP = ["instance_name"]
1867
1868   def CheckPrereq(self):
1869     """Check prerequisites.
1870
1871     This checks that the instance is in the cluster.
1872
1873     """
1874     instance = self.cfg.GetInstanceInfo(
1875       self.cfg.ExpandInstanceName(self.op.instance_name))
1876     if instance is None:
1877       raise errors.OpPrereqError("Instance '%s' not known" %
1878                                  self.op.instance_name)
1879     self.instance = instance
1880
1881   def Exec(self, feedback_fn):
1882     """Deactivate the disks
1883
1884     """
1885     instance = self.instance
1886     ins_l = rpc.call_instance_list([instance.primary_node])
1887     ins_l = ins_l[instance.primary_node]
1888     if not type(ins_l) is list:
1889       raise errors.OpExecError("Can't contact node '%s'" %
1890                                instance.primary_node)
1891
1892     if self.instance.name in ins_l:
1893       raise errors.OpExecError("Instance is running, can't shutdown"
1894                                " block devices.")
1895
1896     _ShutdownInstanceDisks(instance, self.cfg)
1897
1898
1899 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1900   """Shutdown block devices of an instance.
1901
1902   This does the shutdown on all nodes of the instance.
1903
1904   If the ignore_primary is false, errors on the primary node are
1905   ignored.
1906
1907   """
1908   result = True
1909   for disk in instance.disks:
1910     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1911       cfg.SetDiskID(top_disk, node)
1912       if not rpc.call_blockdev_shutdown(node, top_disk):
1913         logger.Error("could not shutdown block device %s on node %s" %
1914                      (disk.iv_name, node))
1915         if not ignore_primary or node != instance.primary_node:
1916           result = False
1917   return result
1918
1919
1920 class LUStartupInstance(LogicalUnit):
1921   """Starts an instance.
1922
1923   """
1924   HPATH = "instance-start"
1925   HTYPE = constants.HTYPE_INSTANCE
1926   _OP_REQP = ["instance_name", "force"]
1927
1928   def BuildHooksEnv(self):
1929     """Build hooks env.
1930
1931     This runs on master, primary and secondary nodes of the instance.
1932
1933     """
1934     env = {
1935       "FORCE": self.op.force,
1936       }
1937     env.update(_BuildInstanceHookEnvByObject(self.instance))
1938     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1939           list(self.instance.secondary_nodes))
1940     return env, nl, nl
1941
1942   def CheckPrereq(self):
1943     """Check prerequisites.
1944
1945     This checks that the instance is in the cluster.
1946
1947     """
1948     instance = self.cfg.GetInstanceInfo(
1949       self.cfg.ExpandInstanceName(self.op.instance_name))
1950     if instance is None:
1951       raise errors.OpPrereqError("Instance '%s' not known" %
1952                                  self.op.instance_name)
1953
1954     # check bridges existance
1955     _CheckInstanceBridgesExist(instance)
1956
1957     self.instance = instance
1958     self.op.instance_name = instance.name
1959
1960   def Exec(self, feedback_fn):
1961     """Start the instance.
1962
1963     """
1964     instance = self.instance
1965     force = self.op.force
1966     extra_args = getattr(self.op, "extra_args", "")
1967
1968     node_current = instance.primary_node
1969
1970     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1971     if not nodeinfo:
1972       raise errors.OpExecError("Could not contact node %s for infos" %
1973                                (node_current))
1974
1975     freememory = nodeinfo[node_current]['memory_free']
1976     memory = instance.memory
1977     if memory > freememory:
1978       raise errors.OpExecError("Not enough memory to start instance"
1979                                " %s on node %s"
1980                                " needed %s MiB, available %s MiB" %
1981                                (instance.name, node_current, memory,
1982                                 freememory))
1983
1984     _StartInstanceDisks(self.cfg, instance, force)
1985
1986     if not rpc.call_instance_start(node_current, instance, extra_args):
1987       _ShutdownInstanceDisks(instance, self.cfg)
1988       raise errors.OpExecError("Could not start instance")
1989
1990     self.cfg.MarkInstanceUp(instance.name)
1991
1992
1993 class LURebootInstance(LogicalUnit):
1994   """Reboot an instance.
1995
1996   """
1997   HPATH = "instance-reboot"
1998   HTYPE = constants.HTYPE_INSTANCE
1999   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2000
2001   def BuildHooksEnv(self):
2002     """Build hooks env.
2003
2004     This runs on master, primary and secondary nodes of the instance.
2005
2006     """
2007     env = {
2008       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2009       }
2010     env.update(_BuildInstanceHookEnvByObject(self.instance))
2011     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2012           list(self.instance.secondary_nodes))
2013     return env, nl, nl
2014
2015   def CheckPrereq(self):
2016     """Check prerequisites.
2017
2018     This checks that the instance is in the cluster.
2019
2020     """
2021     instance = self.cfg.GetInstanceInfo(
2022       self.cfg.ExpandInstanceName(self.op.instance_name))
2023     if instance is None:
2024       raise errors.OpPrereqError("Instance '%s' not known" %
2025                                  self.op.instance_name)
2026
2027     # check bridges existance
2028     _CheckInstanceBridgesExist(instance)
2029
2030     self.instance = instance
2031     self.op.instance_name = instance.name
2032
2033   def Exec(self, feedback_fn):
2034     """Reboot the instance.
2035
2036     """
2037     instance = self.instance
2038     ignore_secondaries = self.op.ignore_secondaries
2039     reboot_type = self.op.reboot_type
2040     extra_args = getattr(self.op, "extra_args", "")
2041
2042     node_current = instance.primary_node
2043
2044     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2045                            constants.INSTANCE_REBOOT_HARD,
2046                            constants.INSTANCE_REBOOT_FULL]:
2047       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2048                                   (constants.INSTANCE_REBOOT_SOFT,
2049                                    constants.INSTANCE_REBOOT_HARD,
2050                                    constants.INSTANCE_REBOOT_FULL))
2051
2052     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2053                        constants.INSTANCE_REBOOT_HARD]:
2054       if not rpc.call_instance_reboot(node_current, instance,
2055                                       reboot_type, extra_args):
2056         raise errors.OpExecError("Could not reboot instance")
2057     else:
2058       if not rpc.call_instance_shutdown(node_current, instance):
2059         raise errors.OpExecError("could not shutdown instance for full reboot")
2060       _ShutdownInstanceDisks(instance, self.cfg)
2061       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2062       if not rpc.call_instance_start(node_current, instance, extra_args):
2063         _ShutdownInstanceDisks(instance, self.cfg)
2064         raise errors.OpExecError("Could not start instance for full reboot")
2065
2066     self.cfg.MarkInstanceUp(instance.name)
2067
2068
2069 class LUShutdownInstance(LogicalUnit):
2070   """Shutdown an instance.
2071
2072   """
2073   HPATH = "instance-stop"
2074   HTYPE = constants.HTYPE_INSTANCE
2075   _OP_REQP = ["instance_name"]
2076
2077   def BuildHooksEnv(self):
2078     """Build hooks env.
2079
2080     This runs on master, primary and secondary nodes of the instance.
2081
2082     """
2083     env = _BuildInstanceHookEnvByObject(self.instance)
2084     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2085           list(self.instance.secondary_nodes))
2086     return env, nl, nl
2087
2088   def CheckPrereq(self):
2089     """Check prerequisites.
2090
2091     This checks that the instance is in the cluster.
2092
2093     """
2094     instance = self.cfg.GetInstanceInfo(
2095       self.cfg.ExpandInstanceName(self.op.instance_name))
2096     if instance is None:
2097       raise errors.OpPrereqError("Instance '%s' not known" %
2098                                  self.op.instance_name)
2099     self.instance = instance
2100
2101   def Exec(self, feedback_fn):
2102     """Shutdown the instance.
2103
2104     """
2105     instance = self.instance
2106     node_current = instance.primary_node
2107     if not rpc.call_instance_shutdown(node_current, instance):
2108       logger.Error("could not shutdown instance")
2109
2110     self.cfg.MarkInstanceDown(instance.name)
2111     _ShutdownInstanceDisks(instance, self.cfg)
2112
2113
2114 class LUReinstallInstance(LogicalUnit):
2115   """Reinstall an instance.
2116
2117   """
2118   HPATH = "instance-reinstall"
2119   HTYPE = constants.HTYPE_INSTANCE
2120   _OP_REQP = ["instance_name"]
2121
2122   def BuildHooksEnv(self):
2123     """Build hooks env.
2124
2125     This runs on master, primary and secondary nodes of the instance.
2126
2127     """
2128     env = _BuildInstanceHookEnvByObject(self.instance)
2129     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130           list(self.instance.secondary_nodes))
2131     return env, nl, nl
2132
2133   def CheckPrereq(self):
2134     """Check prerequisites.
2135
2136     This checks that the instance is in the cluster and is not running.
2137
2138     """
2139     instance = self.cfg.GetInstanceInfo(
2140       self.cfg.ExpandInstanceName(self.op.instance_name))
2141     if instance is None:
2142       raise errors.OpPrereqError("Instance '%s' not known" %
2143                                  self.op.instance_name)
2144     if instance.disk_template == constants.DT_DISKLESS:
2145       raise errors.OpPrereqError("Instance '%s' has no disks" %
2146                                  self.op.instance_name)
2147     if instance.status != "down":
2148       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2149                                  self.op.instance_name)
2150     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2151     if remote_info:
2152       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2153                                  (self.op.instance_name,
2154                                   instance.primary_node))
2155
2156     self.op.os_type = getattr(self.op, "os_type", None)
2157     if self.op.os_type is not None:
2158       # OS verification
2159       pnode = self.cfg.GetNodeInfo(
2160         self.cfg.ExpandNodeName(instance.primary_node))
2161       if pnode is None:
2162         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2163                                    self.op.pnode)
2164       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2165       if not os_obj:
2166         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2167                                    " primary node"  % self.op.os_type)
2168
2169     self.instance = instance
2170
2171   def Exec(self, feedback_fn):
2172     """Reinstall the instance.
2173
2174     """
2175     inst = self.instance
2176
2177     if self.op.os_type is not None:
2178       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2179       inst.os = self.op.os_type
2180       self.cfg.AddInstance(inst)
2181
2182     _StartInstanceDisks(self.cfg, inst, None)
2183     try:
2184       feedback_fn("Running the instance OS create scripts...")
2185       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2186         raise errors.OpExecError("Could not install OS for instance %s "
2187                                  "on node %s" %
2188                                  (inst.name, inst.primary_node))
2189     finally:
2190       _ShutdownInstanceDisks(inst, self.cfg)
2191
2192
2193 class LURenameInstance(LogicalUnit):
2194   """Rename an instance.
2195
2196   """
2197   HPATH = "instance-rename"
2198   HTYPE = constants.HTYPE_INSTANCE
2199   _OP_REQP = ["instance_name", "new_name"]
2200
2201   def BuildHooksEnv(self):
2202     """Build hooks env.
2203
2204     This runs on master, primary and secondary nodes of the instance.
2205
2206     """
2207     env = _BuildInstanceHookEnvByObject(self.instance)
2208     env["INSTANCE_NEW_NAME"] = self.op.new_name
2209     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2210           list(self.instance.secondary_nodes))
2211     return env, nl, nl
2212
2213   def CheckPrereq(self):
2214     """Check prerequisites.
2215
2216     This checks that the instance is in the cluster and is not running.
2217
2218     """
2219     instance = self.cfg.GetInstanceInfo(
2220       self.cfg.ExpandInstanceName(self.op.instance_name))
2221     if instance is None:
2222       raise errors.OpPrereqError("Instance '%s' not known" %
2223                                  self.op.instance_name)
2224     if instance.status != "down":
2225       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2226                                  self.op.instance_name)
2227     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2228     if remote_info:
2229       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2230                                  (self.op.instance_name,
2231                                   instance.primary_node))
2232     self.instance = instance
2233
2234     # new name verification
2235     name_info = utils.HostInfo(self.op.new_name)
2236
2237     self.op.new_name = new_name = name_info.name
2238     if not getattr(self.op, "ignore_ip", False):
2239       command = ["fping", "-q", name_info.ip]
2240       result = utils.RunCmd(command)
2241       if not result.failed:
2242         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2243                                    (name_info.ip, new_name))
2244
2245
2246   def Exec(self, feedback_fn):
2247     """Reinstall the instance.
2248
2249     """
2250     inst = self.instance
2251     old_name = inst.name
2252
2253     self.cfg.RenameInstance(inst.name, self.op.new_name)
2254
2255     # re-read the instance from the configuration after rename
2256     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2257
2258     _StartInstanceDisks(self.cfg, inst, None)
2259     try:
2260       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2261                                           "sda", "sdb"):
2262         msg = ("Could run OS rename script for instance %s\n"
2263                "on node %s\n"
2264                "(but the instance has been renamed in Ganeti)" %
2265                (inst.name, inst.primary_node))
2266         logger.Error(msg)
2267     finally:
2268       _ShutdownInstanceDisks(inst, self.cfg)
2269
2270
2271 class LURemoveInstance(LogicalUnit):
2272   """Remove an instance.
2273
2274   """
2275   HPATH = "instance-remove"
2276   HTYPE = constants.HTYPE_INSTANCE
2277   _OP_REQP = ["instance_name"]
2278
2279   def BuildHooksEnv(self):
2280     """Build hooks env.
2281
2282     This runs on master, primary and secondary nodes of the instance.
2283
2284     """
2285     env = _BuildInstanceHookEnvByObject(self.instance)
2286     nl = [self.sstore.GetMasterNode()]
2287     return env, nl, nl
2288
2289   def CheckPrereq(self):
2290     """Check prerequisites.
2291
2292     This checks that the instance is in the cluster.
2293
2294     """
2295     instance = self.cfg.GetInstanceInfo(
2296       self.cfg.ExpandInstanceName(self.op.instance_name))
2297     if instance is None:
2298       raise errors.OpPrereqError("Instance '%s' not known" %
2299                                  self.op.instance_name)
2300     self.instance = instance
2301
2302   def Exec(self, feedback_fn):
2303     """Remove the instance.
2304
2305     """
2306     instance = self.instance
2307     logger.Info("shutting down instance %s on node %s" %
2308                 (instance.name, instance.primary_node))
2309
2310     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2311       if self.op.ignore_failures:
2312         feedback_fn("Warning: can't shutdown instance")
2313       else:
2314         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2315                                  (instance.name, instance.primary_node))
2316
2317     logger.Info("removing block devices for instance %s" % instance.name)
2318
2319     if not _RemoveDisks(instance, self.cfg):
2320       if self.op.ignore_failures:
2321         feedback_fn("Warning: can't remove instance's disks")
2322       else:
2323         raise errors.OpExecError("Can't remove instance's disks")
2324
2325     logger.Info("removing instance %s out of cluster config" % instance.name)
2326
2327     self.cfg.RemoveInstance(instance.name)
2328
2329
2330 class LUQueryInstances(NoHooksLU):
2331   """Logical unit for querying instances.
2332
2333   """
2334   _OP_REQP = ["output_fields", "names"]
2335
2336   def CheckPrereq(self):
2337     """Check prerequisites.
2338
2339     This checks that the fields required are valid output fields.
2340
2341     """
2342     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2343     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2344                                "admin_state", "admin_ram",
2345                                "disk_template", "ip", "mac", "bridge",
2346                                "sda_size", "sdb_size"],
2347                        dynamic=self.dynamic_fields,
2348                        selected=self.op.output_fields)
2349
2350     self.wanted = _GetWantedInstances(self, self.op.names)
2351
2352   def Exec(self, feedback_fn):
2353     """Computes the list of nodes and their attributes.
2354
2355     """
2356     instance_names = self.wanted
2357     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2358                      in instance_names]
2359
2360     # begin data gathering
2361
2362     nodes = frozenset([inst.primary_node for inst in instance_list])
2363
2364     bad_nodes = []
2365     if self.dynamic_fields.intersection(self.op.output_fields):
2366       live_data = {}
2367       node_data = rpc.call_all_instances_info(nodes)
2368       for name in nodes:
2369         result = node_data[name]
2370         if result:
2371           live_data.update(result)
2372         elif result == False:
2373           bad_nodes.append(name)
2374         # else no instance is alive
2375     else:
2376       live_data = dict([(name, {}) for name in instance_names])
2377
2378     # end data gathering
2379
2380     output = []
2381     for instance in instance_list:
2382       iout = []
2383       for field in self.op.output_fields:
2384         if field == "name":
2385           val = instance.name
2386         elif field == "os":
2387           val = instance.os
2388         elif field == "pnode":
2389           val = instance.primary_node
2390         elif field == "snodes":
2391           val = list(instance.secondary_nodes)
2392         elif field == "admin_state":
2393           val = (instance.status != "down")
2394         elif field == "oper_state":
2395           if instance.primary_node in bad_nodes:
2396             val = None
2397           else:
2398             val = bool(live_data.get(instance.name))
2399         elif field == "admin_ram":
2400           val = instance.memory
2401         elif field == "oper_ram":
2402           if instance.primary_node in bad_nodes:
2403             val = None
2404           elif instance.name in live_data:
2405             val = live_data[instance.name].get("memory", "?")
2406           else:
2407             val = "-"
2408         elif field == "disk_template":
2409           val = instance.disk_template
2410         elif field == "ip":
2411           val = instance.nics[0].ip
2412         elif field == "bridge":
2413           val = instance.nics[0].bridge
2414         elif field == "mac":
2415           val = instance.nics[0].mac
2416         elif field == "sda_size" or field == "sdb_size":
2417           disk = instance.FindDisk(field[:3])
2418           if disk is None:
2419             val = None
2420           else:
2421             val = disk.size
2422         else:
2423           raise errors.ParameterError(field)
2424         iout.append(val)
2425       output.append(iout)
2426
2427     return output
2428
2429
2430 class LUFailoverInstance(LogicalUnit):
2431   """Failover an instance.
2432
2433   """
2434   HPATH = "instance-failover"
2435   HTYPE = constants.HTYPE_INSTANCE
2436   _OP_REQP = ["instance_name", "ignore_consistency"]
2437
2438   def BuildHooksEnv(self):
2439     """Build hooks env.
2440
2441     This runs on master, primary and secondary nodes of the instance.
2442
2443     """
2444     env = {
2445       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2446       }
2447     env.update(_BuildInstanceHookEnvByObject(self.instance))
2448     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2449     return env, nl, nl
2450
2451   def CheckPrereq(self):
2452     """Check prerequisites.
2453
2454     This checks that the instance is in the cluster.
2455
2456     """
2457     instance = self.cfg.GetInstanceInfo(
2458       self.cfg.ExpandInstanceName(self.op.instance_name))
2459     if instance is None:
2460       raise errors.OpPrereqError("Instance '%s' not known" %
2461                                  self.op.instance_name)
2462
2463     if instance.disk_template not in constants.DTS_NET_MIRROR:
2464       raise errors.OpPrereqError("Instance's disk layout is not"
2465                                  " network mirrored, cannot failover.")
2466
2467     secondary_nodes = instance.secondary_nodes
2468     if not secondary_nodes:
2469       raise errors.ProgrammerError("no secondary node but using "
2470                                    "DT_REMOTE_RAID1 template")
2471
2472     # check memory requirements on the secondary node
2473     target_node = secondary_nodes[0]
2474     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2475     info = nodeinfo.get(target_node, None)
2476     if not info:
2477       raise errors.OpPrereqError("Cannot get current information"
2478                                  " from node '%s'" % nodeinfo)
2479     if instance.memory > info['memory_free']:
2480       raise errors.OpPrereqError("Not enough memory on target node %s."
2481                                  " %d MB available, %d MB required" %
2482                                  (target_node, info['memory_free'],
2483                                   instance.memory))
2484
2485     # check bridge existance
2486     brlist = [nic.bridge for nic in instance.nics]
2487     if not rpc.call_bridges_exist(target_node, brlist):
2488       raise errors.OpPrereqError("One or more target bridges %s does not"
2489                                  " exist on destination node '%s'" %
2490                                  (brlist, target_node))
2491
2492     self.instance = instance
2493
2494   def Exec(self, feedback_fn):
2495     """Failover an instance.
2496
2497     The failover is done by shutting it down on its present node and
2498     starting it on the secondary.
2499
2500     """
2501     instance = self.instance
2502
2503     source_node = instance.primary_node
2504     target_node = instance.secondary_nodes[0]
2505
2506     feedback_fn("* checking disk consistency between source and target")
2507     for dev in instance.disks:
2508       # for remote_raid1, these are md over drbd
2509       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2510         if not self.op.ignore_consistency:
2511           raise errors.OpExecError("Disk %s is degraded on target node,"
2512                                    " aborting failover." % dev.iv_name)
2513
2514     feedback_fn("* checking target node resource availability")
2515     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2516
2517     if not nodeinfo:
2518       raise errors.OpExecError("Could not contact target node %s." %
2519                                target_node)
2520
2521     free_memory = int(nodeinfo[target_node]['memory_free'])
2522     memory = instance.memory
2523     if memory > free_memory:
2524       raise errors.OpExecError("Not enough memory to create instance %s on"
2525                                " node %s. needed %s MiB, available %s MiB" %
2526                                (instance.name, target_node, memory,
2527                                 free_memory))
2528
2529     feedback_fn("* shutting down instance on source node")
2530     logger.Info("Shutting down instance %s on node %s" %
2531                 (instance.name, source_node))
2532
2533     if not rpc.call_instance_shutdown(source_node, instance):
2534       if self.op.ignore_consistency:
2535         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2536                      " anyway. Please make sure node %s is down"  %
2537                      (instance.name, source_node, source_node))
2538       else:
2539         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2540                                  (instance.name, source_node))
2541
2542     feedback_fn("* deactivating the instance's disks on source node")
2543     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2544       raise errors.OpExecError("Can't shut down the instance's disks.")
2545
2546     instance.primary_node = target_node
2547     # distribute new instance config to the other nodes
2548     self.cfg.AddInstance(instance)
2549
2550     feedback_fn("* activating the instance's disks on target node")
2551     logger.Info("Starting instance %s on node %s" %
2552                 (instance.name, target_node))
2553
2554     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2555                                              ignore_secondaries=True)
2556     if not disks_ok:
2557       _ShutdownInstanceDisks(instance, self.cfg)
2558       raise errors.OpExecError("Can't activate the instance's disks")
2559
2560     feedback_fn("* starting the instance on the target node")
2561     if not rpc.call_instance_start(target_node, instance, None):
2562       _ShutdownInstanceDisks(instance, self.cfg)
2563       raise errors.OpExecError("Could not start instance %s on node %s." %
2564                                (instance.name, target_node))
2565
2566
2567 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2568   """Create a tree of block devices on the primary node.
2569
2570   This always creates all devices.
2571
2572   """
2573   if device.children:
2574     for child in device.children:
2575       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2576         return False
2577
2578   cfg.SetDiskID(device, node)
2579   new_id = rpc.call_blockdev_create(node, device, device.size,
2580                                     instance.name, True, info)
2581   if not new_id:
2582     return False
2583   if device.physical_id is None:
2584     device.physical_id = new_id
2585   return True
2586
2587
2588 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2589   """Create a tree of block devices on a secondary node.
2590
2591   If this device type has to be created on secondaries, create it and
2592   all its children.
2593
2594   If not, just recurse to children keeping the same 'force' value.
2595
2596   """
2597   if device.CreateOnSecondary():
2598     force = True
2599   if device.children:
2600     for child in device.children:
2601       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2602                                         child, force, info):
2603         return False
2604
2605   if not force:
2606     return True
2607   cfg.SetDiskID(device, node)
2608   new_id = rpc.call_blockdev_create(node, device, device.size,
2609                                     instance.name, False, info)
2610   if not new_id:
2611     return False
2612   if device.physical_id is None:
2613     device.physical_id = new_id
2614   return True
2615
2616
2617 def _GenerateUniqueNames(cfg, exts):
2618   """Generate a suitable LV name.
2619
2620   This will generate a logical volume name for the given instance.
2621
2622   """
2623   results = []
2624   for val in exts:
2625     new_id = cfg.GenerateUniqueID()
2626     results.append("%s%s" % (new_id, val))
2627   return results
2628
2629
2630 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2631   """Generate a drbd device complete with its children.
2632
2633   """
2634   port = cfg.AllocatePort()
2635   vgname = cfg.GetVGName()
2636   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2637                           logical_id=(vgname, names[0]))
2638   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2639                           logical_id=(vgname, names[1]))
2640   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2641                           logical_id = (primary, secondary, port),
2642                           children = [dev_data, dev_meta])
2643   return drbd_dev
2644
2645
2646 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2647   """Generate a drbd8 device complete with its children.
2648
2649   """
2650   port = cfg.AllocatePort()
2651   vgname = cfg.GetVGName()
2652   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2653                           logical_id=(vgname, names[0]))
2654   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2655                           logical_id=(vgname, names[1]))
2656   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2657                           logical_id = (primary, secondary, port),
2658                           children = [dev_data, dev_meta],
2659                           iv_name=iv_name)
2660   return drbd_dev
2661
2662 def _GenerateDiskTemplate(cfg, template_name,
2663                           instance_name, primary_node,
2664                           secondary_nodes, disk_sz, swap_sz):
2665   """Generate the entire disk layout for a given template type.
2666
2667   """
2668   #TODO: compute space requirements
2669
2670   vgname = cfg.GetVGName()
2671   if template_name == "diskless":
2672     disks = []
2673   elif template_name == "plain":
2674     if len(secondary_nodes) != 0:
2675       raise errors.ProgrammerError("Wrong template configuration")
2676
2677     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2678     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2679                            logical_id=(vgname, names[0]),
2680                            iv_name = "sda")
2681     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2682                            logical_id=(vgname, names[1]),
2683                            iv_name = "sdb")
2684     disks = [sda_dev, sdb_dev]
2685   elif template_name == "local_raid1":
2686     if len(secondary_nodes) != 0:
2687       raise errors.ProgrammerError("Wrong template configuration")
2688
2689
2690     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2691                                        ".sdb_m1", ".sdb_m2"])
2692     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2693                               logical_id=(vgname, names[0]))
2694     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2695                               logical_id=(vgname, names[1]))
2696     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2697                               size=disk_sz,
2698                               children = [sda_dev_m1, sda_dev_m2])
2699     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2700                               logical_id=(vgname, names[2]))
2701     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2702                               logical_id=(vgname, names[3]))
2703     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2704                               size=swap_sz,
2705                               children = [sdb_dev_m1, sdb_dev_m2])
2706     disks = [md_sda_dev, md_sdb_dev]
2707   elif template_name == constants.DT_REMOTE_RAID1:
2708     if len(secondary_nodes) != 1:
2709       raise errors.ProgrammerError("Wrong template configuration")
2710     remote_node = secondary_nodes[0]
2711     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2712                                        ".sdb_data", ".sdb_meta"])
2713     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2714                                          disk_sz, names[0:2])
2715     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2716                               children = [drbd_sda_dev], size=disk_sz)
2717     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2718                                          swap_sz, names[2:4])
2719     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2720                               children = [drbd_sdb_dev], size=swap_sz)
2721     disks = [md_sda_dev, md_sdb_dev]
2722   elif template_name == constants.DT_DRBD8:
2723     if len(secondary_nodes) != 1:
2724       raise errors.ProgrammerError("Wrong template configuration")
2725     remote_node = secondary_nodes[0]
2726     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2727                                        ".sdb_data", ".sdb_meta"])
2728     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2729                                          disk_sz, names[0:2], "sda")
2730     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2731                                          swap_sz, names[2:4], "sdb")
2732     disks = [drbd_sda_dev, drbd_sdb_dev]
2733   else:
2734     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2735   return disks
2736
2737
2738 def _GetInstanceInfoText(instance):
2739   """Compute that text that should be added to the disk's metadata.
2740
2741   """
2742   return "originstname+%s" % instance.name
2743
2744
2745 def _CreateDisks(cfg, instance):
2746   """Create all disks for an instance.
2747
2748   This abstracts away some work from AddInstance.
2749
2750   Args:
2751     instance: the instance object
2752
2753   Returns:
2754     True or False showing the success of the creation process
2755
2756   """
2757   info = _GetInstanceInfoText(instance)
2758
2759   for device in instance.disks:
2760     logger.Info("creating volume %s for instance %s" %
2761               (device.iv_name, instance.name))
2762     #HARDCODE
2763     for secondary_node in instance.secondary_nodes:
2764       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2765                                         device, False, info):
2766         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2767                      (device.iv_name, device, secondary_node))
2768         return False
2769     #HARDCODE
2770     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2771                                     instance, device, info):
2772       logger.Error("failed to create volume %s on primary!" %
2773                    device.iv_name)
2774       return False
2775   return True
2776
2777
2778 def _RemoveDisks(instance, cfg):
2779   """Remove all disks for an instance.
2780
2781   This abstracts away some work from `AddInstance()` and
2782   `RemoveInstance()`. Note that in case some of the devices couldn't
2783   be removed, the removal will continue with the other ones (compare
2784   with `_CreateDisks()`).
2785
2786   Args:
2787     instance: the instance object
2788
2789   Returns:
2790     True or False showing the success of the removal proces
2791
2792   """
2793   logger.Info("removing block devices for instance %s" % instance.name)
2794
2795   result = True
2796   for device in instance.disks:
2797     for node, disk in device.ComputeNodeTree(instance.primary_node):
2798       cfg.SetDiskID(disk, node)
2799       if not rpc.call_blockdev_remove(node, disk):
2800         logger.Error("could not remove block device %s on node %s,"
2801                      " continuing anyway" %
2802                      (device.iv_name, node))
2803         result = False
2804   return result
2805
2806
2807 class LUCreateInstance(LogicalUnit):
2808   """Create an instance.
2809
2810   """
2811   HPATH = "instance-add"
2812   HTYPE = constants.HTYPE_INSTANCE
2813   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2814               "disk_template", "swap_size", "mode", "start", "vcpus",
2815               "wait_for_sync", "ip_check"]
2816
2817   def BuildHooksEnv(self):
2818     """Build hooks env.
2819
2820     This runs on master, primary and secondary nodes of the instance.
2821
2822     """
2823     env = {
2824       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2825       "INSTANCE_DISK_SIZE": self.op.disk_size,
2826       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2827       "INSTANCE_ADD_MODE": self.op.mode,
2828       }
2829     if self.op.mode == constants.INSTANCE_IMPORT:
2830       env["INSTANCE_SRC_NODE"] = self.op.src_node
2831       env["INSTANCE_SRC_PATH"] = self.op.src_path
2832       env["INSTANCE_SRC_IMAGE"] = self.src_image
2833
2834     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2835       primary_node=self.op.pnode,
2836       secondary_nodes=self.secondaries,
2837       status=self.instance_status,
2838       os_type=self.op.os_type,
2839       memory=self.op.mem_size,
2840       vcpus=self.op.vcpus,
2841       nics=[(self.inst_ip, self.op.bridge)],
2842     ))
2843
2844     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2845           self.secondaries)
2846     return env, nl, nl
2847
2848
2849   def CheckPrereq(self):
2850     """Check prerequisites.
2851
2852     """
2853     if self.op.mode not in (constants.INSTANCE_CREATE,
2854                             constants.INSTANCE_IMPORT):
2855       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2856                                  self.op.mode)
2857
2858     if self.op.mode == constants.INSTANCE_IMPORT:
2859       src_node = getattr(self.op, "src_node", None)
2860       src_path = getattr(self.op, "src_path", None)
2861       if src_node is None or src_path is None:
2862         raise errors.OpPrereqError("Importing an instance requires source"
2863                                    " node and path options")
2864       src_node_full = self.cfg.ExpandNodeName(src_node)
2865       if src_node_full is None:
2866         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2867       self.op.src_node = src_node = src_node_full
2868
2869       if not os.path.isabs(src_path):
2870         raise errors.OpPrereqError("The source path must be absolute")
2871
2872       export_info = rpc.call_export_info(src_node, src_path)
2873
2874       if not export_info:
2875         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2876
2877       if not export_info.has_section(constants.INISECT_EXP):
2878         raise errors.ProgrammerError("Corrupted export config")
2879
2880       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2881       if (int(ei_version) != constants.EXPORT_VERSION):
2882         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2883                                    (ei_version, constants.EXPORT_VERSION))
2884
2885       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2886         raise errors.OpPrereqError("Can't import instance with more than"
2887                                    " one data disk")
2888
2889       # FIXME: are the old os-es, disk sizes, etc. useful?
2890       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2891       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2892                                                          'disk0_dump'))
2893       self.src_image = diskimage
2894     else: # INSTANCE_CREATE
2895       if getattr(self.op, "os_type", None) is None:
2896         raise errors.OpPrereqError("No guest OS specified")
2897
2898     # check primary node
2899     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2900     if pnode is None:
2901       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2902                                  self.op.pnode)
2903     self.op.pnode = pnode.name
2904     self.pnode = pnode
2905     self.secondaries = []
2906     # disk template and mirror node verification
2907     if self.op.disk_template not in constants.DISK_TEMPLATES:
2908       raise errors.OpPrereqError("Invalid disk template name")
2909
2910     if self.op.disk_template in constants.DTS_NET_MIRROR:
2911       if getattr(self.op, "snode", None) is None:
2912         raise errors.OpPrereqError("The networked disk templates need"
2913                                    " a mirror node")
2914
2915       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2916       if snode_name is None:
2917         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2918                                    self.op.snode)
2919       elif snode_name == pnode.name:
2920         raise errors.OpPrereqError("The secondary node cannot be"
2921                                    " the primary node.")
2922       self.secondaries.append(snode_name)
2923
2924     # Check lv size requirements
2925     nodenames = [pnode.name] + self.secondaries
2926     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2927
2928     # Required free disk space as a function of disk and swap space
2929     req_size_dict = {
2930       constants.DT_DISKLESS: 0,
2931       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2932       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2933       # 256 MB are added for drbd metadata, 128MB for each drbd device
2934       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2935       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2936     }
2937
2938     if self.op.disk_template not in req_size_dict:
2939       raise errors.ProgrammerError("Disk template '%s' size requirement"
2940                                    " is unknown" %  self.op.disk_template)
2941
2942     req_size = req_size_dict[self.op.disk_template]
2943
2944     for node in nodenames:
2945       info = nodeinfo.get(node, None)
2946       if not info:
2947         raise errors.OpPrereqError("Cannot get current information"
2948                                    " from node '%s'" % nodeinfo)
2949       if req_size > info['vg_free']:
2950         raise errors.OpPrereqError("Not enough disk space on target node %s."
2951                                    " %d MB available, %d MB required" %
2952                                    (node, info['vg_free'], req_size))
2953
2954     # os verification
2955     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2956     if not os_obj:
2957       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2958                                  " primary node"  % self.op.os_type)
2959
2960     # instance verification
2961     hostname1 = utils.HostInfo(self.op.instance_name)
2962
2963     self.op.instance_name = instance_name = hostname1.name
2964     instance_list = self.cfg.GetInstanceList()
2965     if instance_name in instance_list:
2966       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2967                                  instance_name)
2968
2969     ip = getattr(self.op, "ip", None)
2970     if ip is None or ip.lower() == "none":
2971       inst_ip = None
2972     elif ip.lower() == "auto":
2973       inst_ip = hostname1.ip
2974     else:
2975       if not utils.IsValidIP(ip):
2976         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2977                                    " like a valid IP" % ip)
2978       inst_ip = ip
2979     self.inst_ip = inst_ip
2980
2981     if self.op.start and not self.op.ip_check:
2982       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2983                                  " adding an instance in start mode")
2984
2985     if self.op.ip_check:
2986       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2987                        constants.DEFAULT_NODED_PORT):
2988         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2989                                    (hostname1.ip, instance_name))
2990
2991     # bridge verification
2992     bridge = getattr(self.op, "bridge", None)
2993     if bridge is None:
2994       self.op.bridge = self.cfg.GetDefBridge()
2995     else:
2996       self.op.bridge = bridge
2997
2998     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2999       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3000                                  " destination node '%s'" %
3001                                  (self.op.bridge, pnode.name))
3002
3003     if self.op.start:
3004       self.instance_status = 'up'
3005     else:
3006       self.instance_status = 'down'
3007
3008   def Exec(self, feedback_fn):
3009     """Create and add the instance to the cluster.
3010
3011     """
3012     instance = self.op.instance_name
3013     pnode_name = self.pnode.name
3014
3015     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
3016     if self.inst_ip is not None:
3017       nic.ip = self.inst_ip
3018
3019     disks = _GenerateDiskTemplate(self.cfg,
3020                                   self.op.disk_template,
3021                                   instance, pnode_name,
3022                                   self.secondaries, self.op.disk_size,
3023                                   self.op.swap_size)
3024
3025     iobj = objects.Instance(name=instance, os=self.op.os_type,
3026                             primary_node=pnode_name,
3027                             memory=self.op.mem_size,
3028                             vcpus=self.op.vcpus,
3029                             nics=[nic], disks=disks,
3030                             disk_template=self.op.disk_template,
3031                             status=self.instance_status,
3032                             )
3033
3034     feedback_fn("* creating instance disks...")
3035     if not _CreateDisks(self.cfg, iobj):
3036       _RemoveDisks(iobj, self.cfg)
3037       raise errors.OpExecError("Device creation failed, reverting...")
3038
3039     feedback_fn("adding instance %s to cluster config" % instance)
3040
3041     self.cfg.AddInstance(iobj)
3042
3043     if self.op.wait_for_sync:
3044       disk_abort = not _WaitForSync(self.cfg, iobj)
3045     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3046       # make sure the disks are not degraded (still sync-ing is ok)
3047       time.sleep(15)
3048       feedback_fn("* checking mirrors status")
3049       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
3050     else:
3051       disk_abort = False
3052
3053     if disk_abort:
3054       _RemoveDisks(iobj, self.cfg)
3055       self.cfg.RemoveInstance(iobj.name)
3056       raise errors.OpExecError("There are some degraded disks for"
3057                                " this instance")
3058
3059     feedback_fn("creating os for instance %s on node %s" %
3060                 (instance, pnode_name))
3061
3062     if iobj.disk_template != constants.DT_DISKLESS:
3063       if self.op.mode == constants.INSTANCE_CREATE:
3064         feedback_fn("* running the instance OS create scripts...")
3065         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3066           raise errors.OpExecError("could not add os for instance %s"
3067                                    " on node %s" %
3068                                    (instance, pnode_name))
3069
3070       elif self.op.mode == constants.INSTANCE_IMPORT:
3071         feedback_fn("* running the instance OS import scripts...")
3072         src_node = self.op.src_node
3073         src_image = self.src_image
3074         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3075                                                 src_node, src_image):
3076           raise errors.OpExecError("Could not import os for instance"
3077                                    " %s on node %s" %
3078                                    (instance, pnode_name))
3079       else:
3080         # also checked in the prereq part
3081         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3082                                      % self.op.mode)
3083
3084     if self.op.start:
3085       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3086       feedback_fn("* starting instance...")
3087       if not rpc.call_instance_start(pnode_name, iobj, None):
3088         raise errors.OpExecError("Could not start instance")
3089
3090
3091 class LUConnectConsole(NoHooksLU):
3092   """Connect to an instance's console.
3093
3094   This is somewhat special in that it returns the command line that
3095   you need to run on the master node in order to connect to the
3096   console.
3097
3098   """
3099   _OP_REQP = ["instance_name"]
3100
3101   def CheckPrereq(self):
3102     """Check prerequisites.
3103
3104     This checks that the instance is in the cluster.
3105
3106     """
3107     instance = self.cfg.GetInstanceInfo(
3108       self.cfg.ExpandInstanceName(self.op.instance_name))
3109     if instance is None:
3110       raise errors.OpPrereqError("Instance '%s' not known" %
3111                                  self.op.instance_name)
3112     self.instance = instance
3113
3114   def Exec(self, feedback_fn):
3115     """Connect to the console of an instance
3116
3117     """
3118     instance = self.instance
3119     node = instance.primary_node
3120
3121     node_insts = rpc.call_instance_list([node])[node]
3122     if node_insts is False:
3123       raise errors.OpExecError("Can't connect to node %s." % node)
3124
3125     if instance.name not in node_insts:
3126       raise errors.OpExecError("Instance %s is not running." % instance.name)
3127
3128     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3129
3130     hyper = hypervisor.GetHypervisor()
3131     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3132     # build ssh cmdline
3133     argv = ["ssh", "-q", "-t"]
3134     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3135     argv.extend(ssh.BATCH_MODE_OPTS)
3136     argv.append(node)
3137     argv.append(console_cmd)
3138     return "ssh", argv
3139
3140
3141 class LUAddMDDRBDComponent(LogicalUnit):
3142   """Adda new mirror member to an instance's disk.
3143
3144   """
3145   HPATH = "mirror-add"
3146   HTYPE = constants.HTYPE_INSTANCE
3147   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3148
3149   def BuildHooksEnv(self):
3150     """Build hooks env.
3151
3152     This runs on the master, the primary and all the secondaries.
3153
3154     """
3155     env = {
3156       "NEW_SECONDARY": self.op.remote_node,
3157       "DISK_NAME": self.op.disk_name,
3158       }
3159     env.update(_BuildInstanceHookEnvByObject(self.instance))
3160     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3161           self.op.remote_node,] + list(self.instance.secondary_nodes)
3162     return env, nl, nl
3163
3164   def CheckPrereq(self):
3165     """Check prerequisites.
3166
3167     This checks that the instance is in the cluster.
3168
3169     """
3170     instance = self.cfg.GetInstanceInfo(
3171       self.cfg.ExpandInstanceName(self.op.instance_name))
3172     if instance is None:
3173       raise errors.OpPrereqError("Instance '%s' not known" %
3174                                  self.op.instance_name)
3175     self.instance = instance
3176
3177     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3178     if remote_node is None:
3179       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3180     self.remote_node = remote_node
3181
3182     if remote_node == instance.primary_node:
3183       raise errors.OpPrereqError("The specified node is the primary node of"
3184                                  " the instance.")
3185
3186     if instance.disk_template != constants.DT_REMOTE_RAID1:
3187       raise errors.OpPrereqError("Instance's disk layout is not"
3188                                  " remote_raid1.")
3189     for disk in instance.disks:
3190       if disk.iv_name == self.op.disk_name:
3191         break
3192     else:
3193       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3194                                  " instance." % self.op.disk_name)
3195     if len(disk.children) > 1:
3196       raise errors.OpPrereqError("The device already has two slave"
3197                                  " devices.\n"
3198                                  "This would create a 3-disk raid1"
3199                                  " which we don't allow.")
3200     self.disk = disk
3201
3202   def Exec(self, feedback_fn):
3203     """Add the mirror component
3204
3205     """
3206     disk = self.disk
3207     instance = self.instance
3208
3209     remote_node = self.remote_node
3210     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3211     names = _GenerateUniqueNames(self.cfg, lv_names)
3212     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3213                                      remote_node, disk.size, names)
3214
3215     logger.Info("adding new mirror component on secondary")
3216     #HARDCODE
3217     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3218                                       new_drbd, False,
3219                                       _GetInstanceInfoText(instance)):
3220       raise errors.OpExecError("Failed to create new component on secondary"
3221                                " node %s" % remote_node)
3222
3223     logger.Info("adding new mirror component on primary")
3224     #HARDCODE
3225     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3226                                     instance, new_drbd,
3227                                     _GetInstanceInfoText(instance)):
3228       # remove secondary dev
3229       self.cfg.SetDiskID(new_drbd, remote_node)
3230       rpc.call_blockdev_remove(remote_node, new_drbd)
3231       raise errors.OpExecError("Failed to create volume on primary")
3232
3233     # the device exists now
3234     # call the primary node to add the mirror to md
3235     logger.Info("adding new mirror component to md")
3236     if not rpc.call_blockdev_addchildren(instance.primary_node,
3237                                          disk, [new_drbd]):
3238       logger.Error("Can't add mirror compoment to md!")
3239       self.cfg.SetDiskID(new_drbd, remote_node)
3240       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3241         logger.Error("Can't rollback on secondary")
3242       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3243       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3244         logger.Error("Can't rollback on primary")
3245       raise errors.OpExecError("Can't add mirror component to md array")
3246
3247     disk.children.append(new_drbd)
3248
3249     self.cfg.AddInstance(instance)
3250
3251     _WaitForSync(self.cfg, instance)
3252
3253     return 0
3254
3255
3256 class LURemoveMDDRBDComponent(LogicalUnit):
3257   """Remove a component from a remote_raid1 disk.
3258
3259   """
3260   HPATH = "mirror-remove"
3261   HTYPE = constants.HTYPE_INSTANCE
3262   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3263
3264   def BuildHooksEnv(self):
3265     """Build hooks env.
3266
3267     This runs on the master, the primary and all the secondaries.
3268
3269     """
3270     env = {
3271       "DISK_NAME": self.op.disk_name,
3272       "DISK_ID": self.op.disk_id,
3273       "OLD_SECONDARY": self.old_secondary,
3274       }
3275     env.update(_BuildInstanceHookEnvByObject(self.instance))
3276     nl = [self.sstore.GetMasterNode(),
3277           self.instance.primary_node] + list(self.instance.secondary_nodes)
3278     return env, nl, nl
3279
3280   def CheckPrereq(self):
3281     """Check prerequisites.
3282
3283     This checks that the instance is in the cluster.
3284
3285     """
3286     instance = self.cfg.GetInstanceInfo(
3287       self.cfg.ExpandInstanceName(self.op.instance_name))
3288     if instance is None:
3289       raise errors.OpPrereqError("Instance '%s' not known" %
3290                                  self.op.instance_name)
3291     self.instance = instance
3292
3293     if instance.disk_template != constants.DT_REMOTE_RAID1:
3294       raise errors.OpPrereqError("Instance's disk layout is not"
3295                                  " remote_raid1.")
3296     for disk in instance.disks:
3297       if disk.iv_name == self.op.disk_name:
3298         break
3299     else:
3300       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3301                                  " instance." % self.op.disk_name)
3302     for child in disk.children:
3303       if (child.dev_type == constants.LD_DRBD7 and
3304           child.logical_id[2] == self.op.disk_id):
3305         break
3306     else:
3307       raise errors.OpPrereqError("Can't find the device with this port.")
3308
3309     if len(disk.children) < 2:
3310       raise errors.OpPrereqError("Cannot remove the last component from"
3311                                  " a mirror.")
3312     self.disk = disk
3313     self.child = child
3314     if self.child.logical_id[0] == instance.primary_node:
3315       oid = 1
3316     else:
3317       oid = 0
3318     self.old_secondary = self.child.logical_id[oid]
3319
3320   def Exec(self, feedback_fn):
3321     """Remove the mirror component
3322
3323     """
3324     instance = self.instance
3325     disk = self.disk
3326     child = self.child
3327     logger.Info("remove mirror component")
3328     self.cfg.SetDiskID(disk, instance.primary_node)
3329     if not rpc.call_blockdev_removechildren(instance.primary_node,
3330                                             disk, [child]):
3331       raise errors.OpExecError("Can't remove child from mirror.")
3332
3333     for node in child.logical_id[:2]:
3334       self.cfg.SetDiskID(child, node)
3335       if not rpc.call_blockdev_remove(node, child):
3336         logger.Error("Warning: failed to remove device from node %s,"
3337                      " continuing operation." % node)
3338
3339     disk.children.remove(child)
3340     self.cfg.AddInstance(instance)
3341
3342
3343 class LUReplaceDisks(LogicalUnit):
3344   """Replace the disks of an instance.
3345
3346   """
3347   HPATH = "mirrors-replace"
3348   HTYPE = constants.HTYPE_INSTANCE
3349   _OP_REQP = ["instance_name", "mode", "disks"]
3350
3351   def BuildHooksEnv(self):
3352     """Build hooks env.
3353
3354     This runs on the master, the primary and all the secondaries.
3355
3356     """
3357     env = {
3358       "MODE": self.op.mode,
3359       "NEW_SECONDARY": self.op.remote_node,
3360       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3361       }
3362     env.update(_BuildInstanceHookEnvByObject(self.instance))
3363     nl = [self.sstore.GetMasterNode(),
3364           self.instance.primary_node] + list(self.instance.secondary_nodes)
3365     return env, nl, nl
3366
3367   def CheckPrereq(self):
3368     """Check prerequisites.
3369
3370     This checks that the instance is in the cluster.
3371
3372     """
3373     instance = self.cfg.GetInstanceInfo(
3374       self.cfg.ExpandInstanceName(self.op.instance_name))
3375     if instance is None:
3376       raise errors.OpPrereqError("Instance '%s' not known" %
3377                                  self.op.instance_name)
3378     self.instance = instance
3379
3380     if instance.disk_template not in constants.DTS_NET_MIRROR:
3381       raise errors.OpPrereqError("Instance's disk layout is not"
3382                                  " network mirrored.")
3383
3384     if len(instance.secondary_nodes) != 1:
3385       raise errors.OpPrereqError("The instance has a strange layout,"
3386                                  " expected one secondary but found %d" %
3387                                  len(instance.secondary_nodes))
3388
3389     self.sec_node = instance.secondary_nodes[0]
3390
3391     remote_node = getattr(self.op, "remote_node", None)
3392     if remote_node is not None:
3393       remote_node = self.cfg.ExpandNodeName(remote_node)
3394       if remote_node is None:
3395         raise errors.OpPrereqError("Node '%s' not known" %
3396                                    self.op.remote_node)
3397       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3398     else:
3399       self.remote_node_info = None
3400     if remote_node == instance.primary_node:
3401       raise errors.OpPrereqError("The specified node is the primary node of"
3402                                  " the instance.")
3403     elif remote_node == self.sec_node:
3404       # the user gave the current secondary, switch to
3405       # 'no-replace-secondary' mode
3406       remote_node = None
3407     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3408         self.op.mode != constants.REPLACE_DISK_ALL):
3409       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3410                                  " disks replacement, not individual ones")
3411     if instance.disk_template == constants.DT_DRBD8:
3412       if self.op.mode == constants.REPLACE_DISK_ALL:
3413         raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3414                                    " secondary disk replacement, not"
3415                                    " both at once")
3416       elif self.op.mode == constants.REPLACE_DISK_PRI:
3417         if remote_node is not None:
3418           raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3419                                      " the secondary while doing a primary"
3420                                      " node disk replacement")
3421         self.tgt_node = instance.primary_node
3422         self.oth_node = instance.secondary_nodes[0]
3423       elif self.op.mode == constants.REPLACE_DISK_SEC:
3424         self.new_node = remote_node # this can be None, in which case
3425                                     # we don't change the secondary
3426         self.tgt_node = instance.secondary_nodes[0]
3427         self.oth_node = instance.primary_node
3428       else:
3429         raise errors.ProgrammerError("Unhandled disk replace mode")
3430
3431     for name in self.op.disks:
3432       if instance.FindDisk(name) is None:
3433         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3434                                    (name, instance.name))
3435     self.op.remote_node = remote_node
3436
3437   def _ExecRR1(self, feedback_fn):
3438     """Replace the disks of an instance.
3439
3440     """
3441     instance = self.instance
3442     iv_names = {}
3443     # start of work
3444     if self.op.remote_node is None:
3445       remote_node = self.sec_node
3446     else:
3447       remote_node = self.op.remote_node
3448     cfg = self.cfg
3449     for dev in instance.disks:
3450       size = dev.size
3451       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3452       names = _GenerateUniqueNames(cfg, lv_names)
3453       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3454                                        remote_node, size, names)
3455       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3456       logger.Info("adding new mirror component on secondary for %s" %
3457                   dev.iv_name)
3458       #HARDCODE
3459       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3460                                         new_drbd, False,
3461                                         _GetInstanceInfoText(instance)):
3462         raise errors.OpExecError("Failed to create new component on"
3463                                  " secondary node %s\n"
3464                                  "Full abort, cleanup manually!" %
3465                                  remote_node)
3466
3467       logger.Info("adding new mirror component on primary")
3468       #HARDCODE
3469       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3470                                       instance, new_drbd,
3471                                       _GetInstanceInfoText(instance)):
3472         # remove secondary dev
3473         cfg.SetDiskID(new_drbd, remote_node)
3474         rpc.call_blockdev_remove(remote_node, new_drbd)
3475         raise errors.OpExecError("Failed to create volume on primary!\n"
3476                                  "Full abort, cleanup manually!!")
3477
3478       # the device exists now
3479       # call the primary node to add the mirror to md
3480       logger.Info("adding new mirror component to md")
3481       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3482                                            [new_drbd]):
3483         logger.Error("Can't add mirror compoment to md!")
3484         cfg.SetDiskID(new_drbd, remote_node)
3485         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3486           logger.Error("Can't rollback on secondary")
3487         cfg.SetDiskID(new_drbd, instance.primary_node)
3488         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3489           logger.Error("Can't rollback on primary")
3490         raise errors.OpExecError("Full abort, cleanup manually!!")
3491
3492       dev.children.append(new_drbd)
3493       cfg.AddInstance(instance)
3494
3495     # this can fail as the old devices are degraded and _WaitForSync
3496     # does a combined result over all disks, so we don't check its
3497     # return value
3498     _WaitForSync(cfg, instance, unlock=True)
3499
3500     # so check manually all the devices
3501     for name in iv_names:
3502       dev, child, new_drbd = iv_names[name]
3503       cfg.SetDiskID(dev, instance.primary_node)
3504       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3505       if is_degr:
3506         raise errors.OpExecError("MD device %s is degraded!" % name)
3507       cfg.SetDiskID(new_drbd, instance.primary_node)
3508       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3509       if is_degr:
3510         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3511
3512     for name in iv_names:
3513       dev, child, new_drbd = iv_names[name]
3514       logger.Info("remove mirror %s component" % name)
3515       cfg.SetDiskID(dev, instance.primary_node)
3516       if not rpc.call_blockdev_removechildren(instance.primary_node,
3517                                               dev, [child]):
3518         logger.Error("Can't remove child from mirror, aborting"
3519                      " *this device cleanup*.\nYou need to cleanup manually!!")
3520         continue
3521
3522       for node in child.logical_id[:2]:
3523         logger.Info("remove child device on %s" % node)
3524         cfg.SetDiskID(child, node)
3525         if not rpc.call_blockdev_remove(node, child):
3526           logger.Error("Warning: failed to remove device from node %s,"
3527                        " continuing operation." % node)
3528
3529       dev.children.remove(child)
3530
3531       cfg.AddInstance(instance)
3532
3533   def _ExecD8DiskOnly(self, feedback_fn):
3534     """Replace a disk on the primary or secondary for dbrd8.
3535
3536     The algorithm for replace is quite complicated:
3537       - for each disk to be replaced:
3538         - create new LVs on the target node with unique names
3539         - detach old LVs from the drbd device
3540         - rename old LVs to name_replaced.<time_t>
3541         - rename new LVs to old LVs
3542         - attach the new LVs (with the old names now) to the drbd device
3543       - wait for sync across all devices
3544       - for each modified disk:
3545         - remove old LVs (which have the name name_replaces.<time_t>)
3546
3547     Failures are not very well handled.
3548
3549     """
3550     steps_total = 6
3551     warning, info = (self.processor.LogWarning, self.processor.LogInfo)
3552     instance = self.instance
3553     iv_names = {}
3554     vgname = self.cfg.GetVGName()
3555     # start of work
3556     cfg = self.cfg
3557     tgt_node = self.tgt_node
3558     oth_node = self.oth_node
3559
3560     # Step: check device activation
3561     self.processor.LogStep(1, steps_total, "check device existence")
3562     info("checking volume groups")
3563     my_vg = cfg.GetVGName()
3564     results = rpc.call_vg_list([oth_node, tgt_node])
3565     if not results:
3566       raise errors.OpExecError("Can't list volume groups on the nodes")
3567     for node in oth_node, tgt_node:
3568       res = results.get(node, False)
3569       if not res or my_vg not in res:
3570         raise errors.OpExecError("Volume group '%s' not found on %s" %
3571                                  (my_vg, node))
3572     for dev in instance.disks:
3573       if not dev.iv_name in self.op.disks:
3574         continue
3575       for node in tgt_node, oth_node:
3576         info("checking %s on %s" % (dev.iv_name, node))
3577         cfg.SetDiskID(dev, node)
3578         if not rpc.call_blockdev_find(node, dev):
3579           raise errors.OpExecError("Can't find device %s on node %s" %
3580                                    (dev.iv_name, node))
3581
3582     # Step: check other node consistency
3583     self.processor.LogStep(2, steps_total, "check peer consistency")
3584     for dev in instance.disks:
3585       if not dev.iv_name in self.op.disks:
3586         continue
3587       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3588       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3589                                    oth_node==instance.primary_node):
3590         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3591                                  " to replace disks on this node (%s)" %
3592                                  (oth_node, tgt_node))
3593
3594     # Step: create new storage
3595     self.processor.LogStep(3, steps_total, "allocate new storage")
3596     for dev in instance.disks:
3597       if not dev.iv_name in self.op.disks:
3598         continue
3599       size = dev.size
3600       cfg.SetDiskID(dev, tgt_node)
3601       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3602       names = _GenerateUniqueNames(cfg, lv_names)
3603       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3604                              logical_id=(vgname, names[0]))
3605       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3606                              logical_id=(vgname, names[1]))
3607       new_lvs = [lv_data, lv_meta]
3608       old_lvs = dev.children
3609       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3610       info("creating new local storage on %s for %s" %
3611            (tgt_node, dev.iv_name))
3612       # since we *always* want to create this LV, we use the
3613       # _Create...OnPrimary (which forces the creation), even if we
3614       # are talking about the secondary node
3615       for new_lv in new_lvs:
3616         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3617                                         _GetInstanceInfoText(instance)):
3618           raise errors.OpExecError("Failed to create new LV named '%s' on"
3619                                    " node '%s'" %
3620                                    (new_lv.logical_id[1], tgt_node))
3621
3622     # Step: for each lv, detach+rename*2+attach
3623     self.processor.LogStep(4, steps_total, "change drbd configuration")
3624     for dev, old_lvs, new_lvs in iv_names.itervalues():
3625       info("detaching %s drbd from local storage" % dev.iv_name)
3626       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3627         raise errors.OpExecError("Can't detach drbd from local storage on node"
3628                                  " %s for device %s" % (tgt_node, dev.iv_name))
3629       #dev.children = []
3630       #cfg.Update(instance)
3631
3632       # ok, we created the new LVs, so now we know we have the needed
3633       # storage; as such, we proceed on the target node to rename
3634       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3635       # using the assumption than logical_id == physical_id (which in
3636       # turn is the unique_id on that node)
3637
3638       # FIXME(iustin): use a better name for the replaced LVs
3639       temp_suffix = int(time.time())
3640       ren_fn = lambda d, suff: (d.physical_id[0],
3641                                 d.physical_id[1] + "_replaced-%s" % suff)
3642       # build the rename list based on what LVs exist on the node
3643       rlist = []
3644       for to_ren in old_lvs:
3645         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3646         if find_res is not None: # device exists
3647           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3648
3649       info("renaming the old LVs on the target node")
3650       if not rpc.call_blockdev_rename(tgt_node, rlist):
3651         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3652       # now we rename the new LVs to the old LVs
3653       info("renaming the new LVs on the target node")
3654       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3655       if not rpc.call_blockdev_rename(tgt_node, rlist):
3656         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3657
3658       for old, new in zip(old_lvs, new_lvs):
3659         new.logical_id = old.logical_id
3660         cfg.SetDiskID(new, tgt_node)
3661
3662       for disk in old_lvs:
3663         disk.logical_id = ren_fn(disk, temp_suffix)
3664         cfg.SetDiskID(disk, tgt_node)
3665
3666       # now that the new lvs have the old name, we can add them to the device
3667       info("adding new mirror component on %s" % tgt_node)
3668       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3669         for new_lv in new_lvs:
3670           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3671             warning("Can't rollback device %s", "manually cleanup unused"
3672                     " logical volumes")
3673         raise errors.OpExecError("Can't add local storage to drbd")
3674
3675       dev.children = new_lvs
3676       cfg.Update(instance)
3677
3678     # Step: wait for sync
3679
3680     # this can fail as the old devices are degraded and _WaitForSync
3681     # does a combined result over all disks, so we don't check its
3682     # return value
3683     self.processor.LogStep(5, steps_total, "sync devices")
3684     _WaitForSync(cfg, instance, unlock=True)
3685
3686     # so check manually all the devices
3687     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3688       cfg.SetDiskID(dev, instance.primary_node)
3689       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3690       if is_degr:
3691         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3692
3693     # Step: remove old storage
3694     self.processor.LogStep(6, steps_total, "removing old storage")
3695     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3696       info("remove logical volumes for %s" % name)
3697       for lv in old_lvs:
3698         cfg.SetDiskID(lv, tgt_node)
3699         if not rpc.call_blockdev_remove(tgt_node, lv):
3700           warning("Can't remove old LV", "manually remove unused LVs")
3701           continue
3702
3703   def _ExecD8Secondary(self, feedback_fn):
3704     """Replace the secondary node for drbd8.
3705
3706     The algorithm for replace is quite complicated:
3707       - for all disks of the instance:
3708         - create new LVs on the new node with same names
3709         - shutdown the drbd device on the old secondary
3710         - disconnect the drbd network on the primary
3711         - create the drbd device on the new secondary
3712         - network attach the drbd on the primary, using an artifice:
3713           the drbd code for Attach() will connect to the network if it
3714           finds a device which is connected to the good local disks but
3715           not network enabled
3716       - wait for sync across all devices
3717       - remove all disks from the old secondary
3718
3719     Failures are not very well handled.
3720     """
3721     instance = self.instance
3722     iv_names = {}
3723     vgname = self.cfg.GetVGName()
3724     # start of work
3725     cfg = self.cfg
3726     old_node = self.tgt_node
3727     new_node = self.new_node
3728     pri_node = instance.primary_node
3729     for dev in instance.disks:
3730       size = dev.size
3731       logger.Info("adding new local storage on %s for %s" %
3732                   (new_node, dev.iv_name))
3733       # since we *always* want to create this LV, we use the
3734       # _Create...OnPrimary (which forces the creation), even if we
3735       # are talking about the secondary node
3736       for new_lv in dev.children:
3737         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3738                                         _GetInstanceInfoText(instance)):
3739           raise errors.OpExecError("Failed to create new LV named '%s' on"
3740                                    " node '%s'" %
3741                                    (new_lv.logical_id[1], new_node))
3742
3743       # create new devices on new_node
3744       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3745                               logical_id=(pri_node, new_node,
3746                                           dev.logical_id[2]),
3747                               children=dev.children)
3748       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3749                                         new_drbd, False,
3750                                       _GetInstanceInfoText(instance)):
3751         raise errors.OpExecError("Failed to create new DRBD on"
3752                                  " node '%s'" % new_node)
3753
3754       # we have new devices, shutdown the drbd on the old secondary
3755       cfg.SetDiskID(dev, old_node)
3756       if not rpc.call_blockdev_shutdown(old_node, dev):
3757         raise errors.OpExecError("Failed to shutdown DRBD on old node")
3758
3759       # we have new storage, we 'rename' the network on the primary
3760       cfg.SetDiskID(dev, pri_node)
3761       # rename to the ip of the new node
3762       new_uid = list(dev.physical_id)
3763       new_uid[2] = self.remote_node_info.secondary_ip
3764       rlist = [(dev, tuple(new_uid))]
3765       if not rpc.call_blockdev_rename(pri_node, rlist):
3766         raise errors.OpExecError("Can't detach re-attach drbd %s on node"
3767                                  " %s from %s to %s" %
3768                                  (dev.iv_name, pri_node, old_node, new_node))
3769       dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3770       cfg.SetDiskID(dev, pri_node)
3771       cfg.Update(instance)
3772
3773       iv_names[dev.iv_name] = (dev, dev.children)
3774
3775     # this can fail as the old devices are degraded and _WaitForSync
3776     # does a combined result over all disks, so we don't check its
3777     # return value
3778     logger.Info("Done changing drbd configs, waiting for sync")
3779     _WaitForSync(cfg, instance, unlock=True)
3780
3781     # so check manually all the devices
3782     for name, (dev, old_lvs) in iv_names.iteritems():
3783       cfg.SetDiskID(dev, pri_node)
3784       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3785       if is_degr:
3786         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3787
3788     for name, (dev, old_lvs) in iv_names.iteritems():
3789       logger.Info("remove logical volumes for %s" % name)
3790       for lv in old_lvs:
3791         cfg.SetDiskID(lv, old_node)
3792         if not rpc.call_blockdev_remove(old_node, lv):
3793           logger.Error("Can't cleanup child device, skipping. You need to"
3794                        " fix manually!")
3795           continue
3796
3797   def Exec(self, feedback_fn):
3798     """Execute disk replacement.
3799
3800     This dispatches the disk replacement to the appropriate handler.
3801
3802     """
3803     instance = self.instance
3804     if instance.disk_template == constants.DT_REMOTE_RAID1:
3805       fn = self._ExecRR1
3806     elif instance.disk_template == constants.DT_DRBD8:
3807       if self.op.remote_node is None:
3808         fn = self._ExecD8DiskOnly
3809       else:
3810         fn = self._ExecD8Secondary
3811     else:
3812       raise errors.ProgrammerError("Unhandled disk replacement case")
3813     return fn(feedback_fn)
3814
3815
3816 class LUQueryInstanceData(NoHooksLU):
3817   """Query runtime instance data.
3818
3819   """
3820   _OP_REQP = ["instances"]
3821
3822   def CheckPrereq(self):
3823     """Check prerequisites.
3824
3825     This only checks the optional instance list against the existing names.
3826
3827     """
3828     if not isinstance(self.op.instances, list):
3829       raise errors.OpPrereqError("Invalid argument type 'instances'")
3830     if self.op.instances:
3831       self.wanted_instances = []
3832       names = self.op.instances
3833       for name in names:
3834         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3835         if instance is None:
3836           raise errors.OpPrereqError("No such instance name '%s'" % name)
3837       self.wanted_instances.append(instance)
3838     else:
3839       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3840                                in self.cfg.GetInstanceList()]
3841     return
3842
3843
3844   def _ComputeDiskStatus(self, instance, snode, dev):
3845     """Compute block device status.
3846
3847     """
3848     self.cfg.SetDiskID(dev, instance.primary_node)
3849     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3850     if dev.dev_type in constants.LDS_DRBD:
3851       # we change the snode then (otherwise we use the one passed in)
3852       if dev.logical_id[0] == instance.primary_node:
3853         snode = dev.logical_id[1]
3854       else:
3855         snode = dev.logical_id[0]
3856
3857     if snode:
3858       self.cfg.SetDiskID(dev, snode)
3859       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3860     else:
3861       dev_sstatus = None
3862
3863     if dev.children:
3864       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3865                       for child in dev.children]
3866     else:
3867       dev_children = []
3868
3869     data = {
3870       "iv_name": dev.iv_name,
3871       "dev_type": dev.dev_type,
3872       "logical_id": dev.logical_id,
3873       "physical_id": dev.physical_id,
3874       "pstatus": dev_pstatus,
3875       "sstatus": dev_sstatus,
3876       "children": dev_children,
3877       }
3878
3879     return data
3880
3881   def Exec(self, feedback_fn):
3882     """Gather and return data"""
3883     result = {}
3884     for instance in self.wanted_instances:
3885       remote_info = rpc.call_instance_info(instance.primary_node,
3886                                                 instance.name)
3887       if remote_info and "state" in remote_info:
3888         remote_state = "up"
3889       else:
3890         remote_state = "down"
3891       if instance.status == "down":
3892         config_state = "down"
3893       else:
3894         config_state = "up"
3895
3896       disks = [self._ComputeDiskStatus(instance, None, device)
3897                for device in instance.disks]
3898
3899       idict = {
3900         "name": instance.name,
3901         "config_state": config_state,
3902         "run_state": remote_state,
3903         "pnode": instance.primary_node,
3904         "snodes": instance.secondary_nodes,
3905         "os": instance.os,
3906         "memory": instance.memory,
3907         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3908         "disks": disks,
3909         "vcpus": instance.vcpus,
3910         }
3911
3912       result[instance.name] = idict
3913
3914     return result
3915
3916
3917 class LUSetInstanceParms(LogicalUnit):
3918   """Modifies an instances's parameters.
3919
3920   """
3921   HPATH = "instance-modify"
3922   HTYPE = constants.HTYPE_INSTANCE
3923   _OP_REQP = ["instance_name"]
3924
3925   def BuildHooksEnv(self):
3926     """Build hooks env.
3927
3928     This runs on the master, primary and secondaries.
3929
3930     """
3931     args = dict()
3932     if self.mem:
3933       args['memory'] = self.mem
3934     if self.vcpus:
3935       args['vcpus'] = self.vcpus
3936     if self.do_ip or self.do_bridge:
3937       if self.do_ip:
3938         ip = self.ip
3939       else:
3940         ip = self.instance.nics[0].ip
3941       if self.bridge:
3942         bridge = self.bridge
3943       else:
3944         bridge = self.instance.nics[0].bridge
3945       args['nics'] = [(ip, bridge)]
3946     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3947     nl = [self.sstore.GetMasterNode(),
3948           self.instance.primary_node] + list(self.instance.secondary_nodes)
3949     return env, nl, nl
3950
3951   def CheckPrereq(self):
3952     """Check prerequisites.
3953
3954     This only checks the instance list against the existing names.
3955
3956     """
3957     self.mem = getattr(self.op, "mem", None)
3958     self.vcpus = getattr(self.op, "vcpus", None)
3959     self.ip = getattr(self.op, "ip", None)
3960     self.bridge = getattr(self.op, "bridge", None)
3961     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3962       raise errors.OpPrereqError("No changes submitted")
3963     if self.mem is not None:
3964       try:
3965         self.mem = int(self.mem)
3966       except ValueError, err:
3967         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3968     if self.vcpus is not None:
3969       try:
3970         self.vcpus = int(self.vcpus)
3971       except ValueError, err:
3972         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3973     if self.ip is not None:
3974       self.do_ip = True
3975       if self.ip.lower() == "none":
3976         self.ip = None
3977       else:
3978         if not utils.IsValidIP(self.ip):
3979           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3980     else:
3981       self.do_ip = False
3982     self.do_bridge = (self.bridge is not None)
3983
3984     instance = self.cfg.GetInstanceInfo(
3985       self.cfg.ExpandInstanceName(self.op.instance_name))
3986     if instance is None:
3987       raise errors.OpPrereqError("No such instance name '%s'" %
3988                                  self.op.instance_name)
3989     self.op.instance_name = instance.name
3990     self.instance = instance
3991     return
3992
3993   def Exec(self, feedback_fn):
3994     """Modifies an instance.
3995
3996     All parameters take effect only at the next restart of the instance.
3997     """
3998     result = []
3999     instance = self.instance
4000     if self.mem:
4001       instance.memory = self.mem
4002       result.append(("mem", self.mem))
4003     if self.vcpus:
4004       instance.vcpus = self.vcpus
4005       result.append(("vcpus",  self.vcpus))
4006     if self.do_ip:
4007       instance.nics[0].ip = self.ip
4008       result.append(("ip", self.ip))
4009     if self.bridge:
4010       instance.nics[0].bridge = self.bridge
4011       result.append(("bridge", self.bridge))
4012
4013     self.cfg.AddInstance(instance)
4014
4015     return result
4016
4017
4018 class LUQueryExports(NoHooksLU):
4019   """Query the exports list
4020
4021   """
4022   _OP_REQP = []
4023
4024   def CheckPrereq(self):
4025     """Check that the nodelist contains only existing nodes.
4026
4027     """
4028     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4029
4030   def Exec(self, feedback_fn):
4031     """Compute the list of all the exported system images.
4032
4033     Returns:
4034       a dictionary with the structure node->(export-list)
4035       where export-list is a list of the instances exported on
4036       that node.
4037
4038     """
4039     return rpc.call_export_list(self.nodes)
4040
4041
4042 class LUExportInstance(LogicalUnit):
4043   """Export an instance to an image in the cluster.
4044
4045   """
4046   HPATH = "instance-export"
4047   HTYPE = constants.HTYPE_INSTANCE
4048   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4049
4050   def BuildHooksEnv(self):
4051     """Build hooks env.
4052
4053     This will run on the master, primary node and target node.
4054
4055     """
4056     env = {
4057       "EXPORT_NODE": self.op.target_node,
4058       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4059       }
4060     env.update(_BuildInstanceHookEnvByObject(self.instance))
4061     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4062           self.op.target_node]
4063     return env, nl, nl
4064
4065   def CheckPrereq(self):
4066     """Check prerequisites.
4067
4068     This checks that the instance name is a valid one.
4069
4070     """
4071     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4072     self.instance = self.cfg.GetInstanceInfo(instance_name)
4073     if self.instance is None:
4074       raise errors.OpPrereqError("Instance '%s' not found" %
4075                                  self.op.instance_name)
4076
4077     # node verification
4078     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4079     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4080
4081     if self.dst_node is None:
4082       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4083                                  self.op.target_node)
4084     self.op.target_node = self.dst_node.name
4085
4086   def Exec(self, feedback_fn):
4087     """Export an instance to an image in the cluster.
4088
4089     """
4090     instance = self.instance
4091     dst_node = self.dst_node
4092     src_node = instance.primary_node
4093     # shutdown the instance, unless requested not to do so
4094     if self.op.shutdown:
4095       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4096       self.processor.ChainOpCode(op)
4097
4098     vgname = self.cfg.GetVGName()
4099
4100     snap_disks = []
4101
4102     try:
4103       for disk in instance.disks:
4104         if disk.iv_name == "sda":
4105           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4106           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4107
4108           if not new_dev_name:
4109             logger.Error("could not snapshot block device %s on node %s" %
4110                          (disk.logical_id[1], src_node))
4111           else:
4112             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4113                                       logical_id=(vgname, new_dev_name),
4114                                       physical_id=(vgname, new_dev_name),
4115                                       iv_name=disk.iv_name)
4116             snap_disks.append(new_dev)
4117
4118     finally:
4119       if self.op.shutdown:
4120         op = opcodes.OpStartupInstance(instance_name=instance.name,
4121                                        force=False)
4122         self.processor.ChainOpCode(op)
4123
4124     # TODO: check for size
4125
4126     for dev in snap_disks:
4127       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4128                                            instance):
4129         logger.Error("could not export block device %s from node"
4130                      " %s to node %s" %
4131                      (dev.logical_id[1], src_node, dst_node.name))
4132       if not rpc.call_blockdev_remove(src_node, dev):
4133         logger.Error("could not remove snapshot block device %s from"
4134                      " node %s" % (dev.logical_id[1], src_node))
4135
4136     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4137       logger.Error("could not finalize export for instance %s on node %s" %
4138                    (instance.name, dst_node.name))
4139
4140     nodelist = self.cfg.GetNodeList()
4141     nodelist.remove(dst_node.name)
4142
4143     # on one-node clusters nodelist will be empty after the removal
4144     # if we proceed the backup would be removed because OpQueryExports
4145     # substitutes an empty list with the full cluster node list.
4146     if nodelist:
4147       op = opcodes.OpQueryExports(nodes=nodelist)
4148       exportlist = self.processor.ChainOpCode(op)
4149       for node in exportlist:
4150         if instance.name in exportlist[node]:
4151           if not rpc.call_export_remove(node, instance.name):
4152             logger.Error("could not remove older export for instance %s"
4153                          " on node %s" % (instance.name, node))
4154
4155
4156 class TagsLU(NoHooksLU):
4157   """Generic tags LU.
4158
4159   This is an abstract class which is the parent of all the other tags LUs.
4160
4161   """
4162   def CheckPrereq(self):
4163     """Check prerequisites.
4164
4165     """
4166     if self.op.kind == constants.TAG_CLUSTER:
4167       self.target = self.cfg.GetClusterInfo()
4168     elif self.op.kind == constants.TAG_NODE:
4169       name = self.cfg.ExpandNodeName(self.op.name)
4170       if name is None:
4171         raise errors.OpPrereqError("Invalid node name (%s)" %
4172                                    (self.op.name,))
4173       self.op.name = name
4174       self.target = self.cfg.GetNodeInfo(name)
4175     elif self.op.kind == constants.TAG_INSTANCE:
4176       name = self.cfg.ExpandInstanceName(self.op.name)
4177       if name is None:
4178         raise errors.OpPrereqError("Invalid instance name (%s)" %
4179                                    (self.op.name,))
4180       self.op.name = name
4181       self.target = self.cfg.GetInstanceInfo(name)
4182     else:
4183       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4184                                  str(self.op.kind))
4185
4186
4187 class LUGetTags(TagsLU):
4188   """Returns the tags of a given object.
4189
4190   """
4191   _OP_REQP = ["kind", "name"]
4192
4193   def Exec(self, feedback_fn):
4194     """Returns the tag list.
4195
4196     """
4197     return self.target.GetTags()
4198
4199
4200 class LUSearchTags(NoHooksLU):
4201   """Searches the tags for a given pattern.
4202
4203   """
4204   _OP_REQP = ["pattern"]
4205
4206   def CheckPrereq(self):
4207     """Check prerequisites.
4208
4209     This checks the pattern passed for validity by compiling it.
4210
4211     """
4212     try:
4213       self.re = re.compile(self.op.pattern)
4214     except re.error, err:
4215       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4216                                  (self.op.pattern, err))
4217
4218   def Exec(self, feedback_fn):
4219     """Returns the tag list.
4220
4221     """
4222     cfg = self.cfg
4223     tgts = [("/cluster", cfg.GetClusterInfo())]
4224     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4225     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4226     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4227     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4228     results = []
4229     for path, target in tgts:
4230       for tag in target.GetTags():
4231         if self.re.search(tag):
4232           results.append((path, tag))
4233     return results
4234
4235
4236 class LUAddTags(TagsLU):
4237   """Sets a tag on a given object.
4238
4239   """
4240   _OP_REQP = ["kind", "name", "tags"]
4241
4242   def CheckPrereq(self):
4243     """Check prerequisites.
4244
4245     This checks the type and length of the tag name and value.
4246
4247     """
4248     TagsLU.CheckPrereq(self)
4249     for tag in self.op.tags:
4250       objects.TaggableObject.ValidateTag(tag)
4251
4252   def Exec(self, feedback_fn):
4253     """Sets the tag.
4254
4255     """
4256     try:
4257       for tag in self.op.tags:
4258         self.target.AddTag(tag)
4259     except errors.TagError, err:
4260       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4261     try:
4262       self.cfg.Update(self.target)
4263     except errors.ConfigurationError:
4264       raise errors.OpRetryError("There has been a modification to the"
4265                                 " config file and the operation has been"
4266                                 " aborted. Please retry.")
4267
4268
4269 class LUDelTags(TagsLU):
4270   """Delete a list of tags from a given object.
4271
4272   """
4273   _OP_REQP = ["kind", "name", "tags"]
4274
4275   def CheckPrereq(self):
4276     """Check prerequisites.
4277
4278     This checks that we have the given tag.
4279
4280     """
4281     TagsLU.CheckPrereq(self)
4282     for tag in self.op.tags:
4283       objects.TaggableObject.ValidateTag(tag)
4284     del_tags = frozenset(self.op.tags)
4285     cur_tags = self.target.GetTags()
4286     if not del_tags <= cur_tags:
4287       diff_tags = del_tags - cur_tags
4288       diff_names = ["'%s'" % tag for tag in diff_tags]
4289       diff_names.sort()
4290       raise errors.OpPrereqError("Tag(s) %s not found" %
4291                                  (",".join(diff_names)))
4292
4293   def Exec(self, feedback_fn):
4294     """Remove the tag from the object.
4295
4296     """
4297     for tag in self.op.tags:
4298       self.target.RemoveTag(tag)
4299     try:
4300       self.cfg.Update(self.target)
4301     except errors.ConfigurationError:
4302       raise errors.OpRetryError("There has been a modification to the"
4303                                 " config file and the operation has been"
4304                                 " aborted. Please retry.")