Allow force removal of instances
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != utils.HostInfo().name:
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return {}, [], []
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "OP_TARGET": name,
243     "INSTANCE_NAME": name,
244     "INSTANCE_PRIMARY": primary_node,
245     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246     "INSTANCE_OS_TYPE": os_type,
247     "INSTANCE_STATUS": status,
248     "INSTANCE_MEMORY": memory,
249     "INSTANCE_VCPUS": vcpus,
250   }
251
252   if nics:
253     nic_count = len(nics)
254     for idx, (ip, bridge) in enumerate(nics):
255       if ip is None:
256         ip = ""
257       env["INSTANCE_NIC%d_IP" % idx] = ip
258       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259   else:
260     nic_count = 0
261
262   env["INSTANCE_NIC_COUNT"] = nic_count
263
264   return env
265
266
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268   """Builds instance related env variables for hooks from an object.
269
270   Args:
271     instance: objects.Instance object of instance
272     override: dict of values to override
273   """
274   args = {
275     'name': instance.name,
276     'primary_node': instance.primary_node,
277     'secondary_nodes': instance.secondary_nodes,
278     'os_type': instance.os,
279     'status': instance.os,
280     'memory': instance.memory,
281     'vcpus': instance.vcpus,
282     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283   }
284   if override:
285     args.update(override)
286   return _BuildInstanceHookEnv(**args)
287
288
289 def _UpdateEtcHosts(fullnode, ip):
290   """Ensure a node has a correct entry in /etc/hosts.
291
292   Args:
293     fullnode - Fully qualified domain name of host. (str)
294     ip       - IPv4 address of host (str)
295
296   """
297   node = fullnode.split(".", 1)[0]
298
299   f = open('/etc/hosts', 'r+')
300
301   inthere = False
302
303   save_lines = []
304   add_lines = []
305   removed = False
306
307   while True:
308     rawline = f.readline()
309
310     if not rawline:
311       # End of file
312       break
313
314     line = rawline.split('\n')[0]
315
316     # Strip off comments
317     line = line.split('#')[0]
318
319     if not line:
320       # Entire line was comment, skip
321       save_lines.append(rawline)
322       continue
323
324     fields = line.split()
325
326     haveall = True
327     havesome = False
328     for spec in [ ip, fullnode, node ]:
329       if spec not in fields:
330         haveall = False
331       if spec in fields:
332         havesome = True
333
334     if haveall:
335       inthere = True
336       save_lines.append(rawline)
337       continue
338
339     if havesome and not haveall:
340       # Line (old, or manual?) which is missing some.  Remove.
341       removed = True
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348
349   if removed:
350     if add_lines:
351       save_lines = save_lines + add_lines
352
353     # We removed a line, write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     os.rename(tmpname, '/etc/hosts')
359
360   elif add_lines:
361     # Simply appending a new line will do the trick.
362     f.seek(0, 2)
363     for add in add_lines:
364       f.write(add)
365
366   f.close()
367
368
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370   """Ensure a node has a correct known_hosts entry.
371
372   Args:
373     fullnode - Fully qualified domain name of host. (str)
374     ip       - IPv4 address of host (str)
375     pubkey   - the public key of the cluster
376
377   """
378   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380   else:
381     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382
383   inthere = False
384
385   save_lines = []
386   add_lines = []
387   removed = False
388
389   while True:
390     rawline = f.readline()
391     logger.Debug('read %s' % (repr(rawline),))
392
393     if not rawline:
394       # End of file
395       break
396
397     line = rawline.split('\n')[0]
398
399     parts = line.split(' ')
400     fields = parts[0].split(',')
401     key = parts[2]
402
403     haveall = True
404     havesome = False
405     for spec in [ ip, fullnode ]:
406       if spec not in fields:
407         haveall = False
408       if spec in fields:
409         havesome = True
410
411     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
412     if haveall and key == pubkey:
413       inthere = True
414       save_lines.append(rawline)
415       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
416       continue
417
418     if havesome and (not haveall or key != pubkey):
419       removed = True
420       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
421       continue
422
423     save_lines.append(rawline)
424
425   if not inthere:
426     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
427     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
428
429   if removed:
430     save_lines = save_lines + add_lines
431
432     # Write a new file and replace old.
433     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
434                                    constants.DATA_DIR)
435     newfile = os.fdopen(fd, 'w')
436     try:
437       newfile.write(''.join(save_lines))
438     finally:
439       newfile.close()
440     logger.Debug("Wrote new known_hosts.")
441     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
442
443   elif add_lines:
444     # Simply appending a new line will do the trick.
445     f.seek(0, 2)
446     for add in add_lines:
447       f.write(add)
448
449   f.close()
450
451
452 def _HasValidVG(vglist, vgname):
453   """Checks if the volume group list is valid.
454
455   A non-None return value means there's an error, and the return value
456   is the error message.
457
458   """
459   vgsize = vglist.get(vgname, None)
460   if vgsize is None:
461     return "volume group '%s' missing" % vgname
462   elif vgsize < 20480:
463     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
464             (vgname, vgsize))
465   return None
466
467
468 def _InitSSHSetup(node):
469   """Setup the SSH configuration for the cluster.
470
471
472   This generates a dsa keypair for root, adds the pub key to the
473   permitted hosts and adds the hostkey to its own known hosts.
474
475   Args:
476     node: the name of this host as a fqdn
477
478   """
479   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
480
481   for name in priv_key, pub_key:
482     if os.path.exists(name):
483       utils.CreateBackup(name)
484     utils.RemoveFile(name)
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", priv_key,
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open(pub_key, 'r')
494   try:
495     utils.AddAuthorizedKey(auth_keys, f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 class LUInitCluster(LogicalUnit):
532   """Initialise the cluster.
533
534   """
535   HPATH = "cluster-init"
536   HTYPE = constants.HTYPE_CLUSTER
537   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538               "def_bridge", "master_netdev"]
539   REQ_CLUSTER = False
540
541   def BuildHooksEnv(self):
542     """Build hooks env.
543
544     Notes: Since we don't require a cluster, we must manually add
545     ourselves in the post-run node list.
546
547     """
548     env = {"OP_TARGET": self.op.cluster_name}
549     return env, [], [self.hostname.name]
550
551   def CheckPrereq(self):
552     """Verify that the passed name is a valid one.
553
554     """
555     if config.ConfigWriter.IsCluster():
556       raise errors.OpPrereqError("Cluster is already initialised")
557
558     self.hostname = hostname = utils.HostInfo()
559
560     if hostname.ip.startswith("127."):
561       raise errors.OpPrereqError("This host's IP resolves to the private"
562                                  " range (%s). Please fix DNS or /etc/hosts." %
563                                  (hostname.ip,))
564
565     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
566
567     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
568                          constants.DEFAULT_NODED_PORT):
569       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
570                                  " to %s,\nbut this ip address does not"
571                                  " belong to this host."
572                                  " Aborting." % hostname.ip)
573
574     secondary_ip = getattr(self.op, "secondary_ip", None)
575     if secondary_ip and not utils.IsValidIP(secondary_ip):
576       raise errors.OpPrereqError("Invalid secondary ip given")
577     if (secondary_ip and
578         secondary_ip != hostname.ip and
579         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
580                            constants.DEFAULT_NODED_PORT))):
581       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
582                                  "but it does not belong to this host." %
583                                  secondary_ip)
584     self.secondary_ip = secondary_ip
585
586     # checks presence of the volume group given
587     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
588
589     if vgstatus:
590       raise errors.OpPrereqError("Error: %s" % vgstatus)
591
592     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
593                     self.op.mac_prefix):
594       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
595                                  self.op.mac_prefix)
596
597     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
598       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
599                                  self.op.hypervisor_type)
600
601     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
602     if result.failed:
603       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
604                                  (self.op.master_netdev,
605                                   result.output.strip()))
606
607   def Exec(self, feedback_fn):
608     """Initialize the cluster.
609
610     """
611     clustername = self.clustername
612     hostname = self.hostname
613
614     # set up the simple store
615     self.sstore = ss = ssconf.SimpleStore()
616     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
617     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
618     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
619     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
620     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
621
622     # set up the inter-node password and certificate
623     _InitGanetiServerSetup(ss)
624
625     # start the master ip
626     rpc.call_node_start_master(hostname.name)
627
628     # set up ssh config and /etc/hosts
629     f = open(constants.SSH_HOST_RSA_PUB, 'r')
630     try:
631       sshline = f.read()
632     finally:
633       f.close()
634     sshkey = sshline.split(" ")[1]
635
636     _UpdateEtcHosts(hostname.name, hostname.ip)
637
638     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
639
640     _InitSSHSetup(hostname.name)
641
642     # init of cluster config file
643     self.cfg = cfgw = config.ConfigWriter()
644     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
645                     sshkey, self.op.mac_prefix,
646                     self.op.vg_name, self.op.def_bridge)
647
648
649 class LUDestroyCluster(NoHooksLU):
650   """Logical unit for destroying the cluster.
651
652   """
653   _OP_REQP = []
654
655   def CheckPrereq(self):
656     """Check prerequisites.
657
658     This checks whether the cluster is empty.
659
660     Any errors are signalled by raising errors.OpPrereqError.
661
662     """
663     master = self.sstore.GetMasterNode()
664
665     nodelist = self.cfg.GetNodeList()
666     if len(nodelist) != 1 or nodelist[0] != master:
667       raise errors.OpPrereqError("There are still %d node(s) in"
668                                  " this cluster." % (len(nodelist) - 1))
669     instancelist = self.cfg.GetInstanceList()
670     if instancelist:
671       raise errors.OpPrereqError("There are still %d instance(s) in"
672                                  " this cluster." % len(instancelist))
673
674   def Exec(self, feedback_fn):
675     """Destroys the cluster.
676
677     """
678     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679     utils.CreateBackup(priv_key)
680     utils.CreateBackup(pub_key)
681     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
682
683
684 class LUVerifyCluster(NoHooksLU):
685   """Verifies the cluster status.
686
687   """
688   _OP_REQP = []
689
690   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
691                   remote_version, feedback_fn):
692     """Run multiple tests against a node.
693
694     Test list:
695       - compares ganeti version
696       - checks vg existance and size > 20G
697       - checks config file checksum
698       - checks ssh to other nodes
699
700     Args:
701       node: name of the node to check
702       file_list: required list of files
703       local_cksum: dictionary of local files and their checksums
704
705     """
706     # compares ganeti version
707     local_version = constants.PROTOCOL_VERSION
708     if not remote_version:
709       feedback_fn(" - ERROR: connection to %s failed" % (node))
710       return True
711
712     if local_version != remote_version:
713       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
714                       (local_version, node, remote_version))
715       return True
716
717     # checks vg existance and size > 20G
718
719     bad = False
720     if not vglist:
721       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
722                       (node,))
723       bad = True
724     else:
725       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
726       if vgstatus:
727         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728         bad = True
729
730     # checks config file checksum
731     # checks ssh to any
732
733     if 'filelist' not in node_result:
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       remote_cksum = node_result['filelist']
738       for file_name in file_list:
739         if file_name not in remote_cksum:
740           bad = True
741           feedback_fn("  - ERROR: file '%s' missing" % file_name)
742         elif remote_cksum[file_name] != local_cksum[file_name]:
743           bad = True
744           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
745
746     if 'nodelist' not in node_result:
747       bad = True
748       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
749     else:
750       if node_result['nodelist']:
751         bad = True
752         for node in node_result['nodelist']:
753           feedback_fn("  - ERROR: communication with node '%s': %s" %
754                           (node, node_result['nodelist'][node]))
755     hyp_result = node_result.get('hypervisor', None)
756     if hyp_result is not None:
757       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
758     return bad
759
760   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
761     """Verify an instance.
762
763     This function checks to see if the required block devices are
764     available on the instance's node.
765
766     """
767     bad = False
768
769     instancelist = self.cfg.GetInstanceList()
770     if not instance in instancelist:
771       feedback_fn("  - ERROR: instance %s not in instance list %s" %
772                       (instance, instancelist))
773       bad = True
774
775     instanceconfig = self.cfg.GetInstanceInfo(instance)
776     node_current = instanceconfig.primary_node
777
778     node_vol_should = {}
779     instanceconfig.MapLVsByNode(node_vol_should)
780
781     for node in node_vol_should:
782       for volume in node_vol_should[node]:
783         if node not in node_vol_is or volume not in node_vol_is[node]:
784           feedback_fn("  - ERROR: volume %s missing on node %s" %
785                           (volume, node))
786           bad = True
787
788     if not instanceconfig.status == 'down':
789       if not instance in node_instance[node_current]:
790         feedback_fn("  - ERROR: instance %s not running on node %s" %
791                         (instance, node_current))
792         bad = True
793
794     for node in node_instance:
795       if (not node == node_current):
796         if instance in node_instance[node]:
797           feedback_fn("  - ERROR: instance %s should not run on node %s" %
798                           (instance, node))
799           bad = True
800
801     return not bad
802
803   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
804     """Verify if there are any unknown volumes in the cluster.
805
806     The .os, .swap and backup volumes are ignored. All other volumes are
807     reported as unknown.
808
809     """
810     bad = False
811
812     for node in node_vol_is:
813       for volume in node_vol_is[node]:
814         if node not in node_vol_should or volume not in node_vol_should[node]:
815           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
816                       (volume, node))
817           bad = True
818     return bad
819
820   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
821     """Verify the list of running instances.
822
823     This checks what instances are running but unknown to the cluster.
824
825     """
826     bad = False
827     for node in node_instance:
828       for runninginstance in node_instance[node]:
829         if runninginstance not in instancelist:
830           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
831                           (runninginstance, node))
832           bad = True
833     return bad
834
835   def CheckPrereq(self):
836     """Check prerequisites.
837
838     This has no prerequisites.
839
840     """
841     pass
842
843   def Exec(self, feedback_fn):
844     """Verify integrity of cluster, performing various test on nodes.
845
846     """
847     bad = False
848     feedback_fn("* Verifying global settings")
849     self.cfg.VerifyConfig()
850
851     master = self.sstore.GetMasterNode()
852     vg_name = self.cfg.GetVGName()
853     nodelist = utils.NiceSort(self.cfg.GetNodeList())
854     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
855     node_volume = {}
856     node_instance = {}
857
858     # FIXME: verify OS list
859     # do local checksums
860     file_names = list(self.sstore.GetFileList())
861     file_names.append(constants.SSL_CERT_FILE)
862     file_names.append(constants.CLUSTER_CONF_FILE)
863     local_checksums = utils.FingerprintFiles(file_names)
864
865     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
866     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
867     all_instanceinfo = rpc.call_instance_list(nodelist)
868     all_vglist = rpc.call_vg_list(nodelist)
869     node_verify_param = {
870       'filelist': file_names,
871       'nodelist': nodelist,
872       'hypervisor': None,
873       }
874     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
875     all_rversion = rpc.call_version(nodelist)
876
877     for node in nodelist:
878       feedback_fn("* Verifying node %s" % node)
879       result = self._VerifyNode(node, file_names, local_checksums,
880                                 all_vglist[node], all_nvinfo[node],
881                                 all_rversion[node], feedback_fn)
882       bad = bad or result
883
884       # node_volume
885       volumeinfo = all_volumeinfo[node]
886
887       if type(volumeinfo) != dict:
888         feedback_fn("  - ERROR: connection to %s failed" % (node,))
889         bad = True
890         continue
891
892       node_volume[node] = volumeinfo
893
894       # node_instance
895       nodeinstance = all_instanceinfo[node]
896       if type(nodeinstance) != list:
897         feedback_fn("  - ERROR: connection to %s failed" % (node,))
898         bad = True
899         continue
900
901       node_instance[node] = nodeinstance
902
903     node_vol_should = {}
904
905     for instance in instancelist:
906       feedback_fn("* Verifying instance %s" % instance)
907       result =  self._VerifyInstance(instance, node_volume, node_instance,
908                                      feedback_fn)
909       bad = bad or result
910
911       inst_config = self.cfg.GetInstanceInfo(instance)
912
913       inst_config.MapLVsByNode(node_vol_should)
914
915     feedback_fn("* Verifying orphan volumes")
916     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
917                                        feedback_fn)
918     bad = bad or result
919
920     feedback_fn("* Verifying remaining instances")
921     result = self._VerifyOrphanInstances(instancelist, node_instance,
922                                          feedback_fn)
923     bad = bad or result
924
925     return int(bad)
926
927
928 class LURenameCluster(LogicalUnit):
929   """Rename the cluster.
930
931   """
932   HPATH = "cluster-rename"
933   HTYPE = constants.HTYPE_CLUSTER
934   _OP_REQP = ["name"]
935
936   def BuildHooksEnv(self):
937     """Build hooks env.
938
939     """
940     env = {
941       "OP_TARGET": self.op.sstore.GetClusterName(),
942       "NEW_NAME": self.op.name,
943       }
944     mn = self.sstore.GetMasterNode()
945     return env, [mn], [mn]
946
947   def CheckPrereq(self):
948     """Verify that the passed name is a valid one.
949
950     """
951     hostname = utils.HostInfo(self.op.name)
952
953     new_name = hostname.name
954     self.ip = new_ip = hostname.ip
955     old_name = self.sstore.GetClusterName()
956     old_ip = self.sstore.GetMasterIP()
957     if new_name == old_name and new_ip == old_ip:
958       raise errors.OpPrereqError("Neither the name nor the IP address of the"
959                                  " cluster has changed")
960     if new_ip != old_ip:
961       result = utils.RunCmd(["fping", "-q", new_ip])
962       if not result.failed:
963         raise errors.OpPrereqError("The given cluster IP address (%s) is"
964                                    " reachable on the network. Aborting." %
965                                    new_ip)
966
967     self.op.name = new_name
968
969   def Exec(self, feedback_fn):
970     """Rename the cluster.
971
972     """
973     clustername = self.op.name
974     ip = self.ip
975     ss = self.sstore
976
977     # shutdown the master IP
978     master = ss.GetMasterNode()
979     if not rpc.call_node_stop_master(master):
980       raise errors.OpExecError("Could not disable the master role")
981
982     try:
983       # modify the sstore
984       ss.SetKey(ss.SS_MASTER_IP, ip)
985       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
986
987       # Distribute updated ss config to all nodes
988       myself = self.cfg.GetNodeInfo(master)
989       dist_nodes = self.cfg.GetNodeList()
990       if myself.name in dist_nodes:
991         dist_nodes.remove(myself.name)
992
993       logger.Debug("Copying updated ssconf data to all nodes")
994       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
995         fname = ss.KeyToFilename(keyname)
996         result = rpc.call_upload_file(dist_nodes, fname)
997         for to_node in dist_nodes:
998           if not result[to_node]:
999             logger.Error("copy of file %s to node %s failed" %
1000                          (fname, to_node))
1001     finally:
1002       if not rpc.call_node_start_master(master):
1003         logger.Error("Could not re-enable the master role on the master,\n"
1004                      "please restart manually.")
1005
1006
1007 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1008   """Sleep and poll for an instance's disk to sync.
1009
1010   """
1011   if not instance.disks:
1012     return True
1013
1014   if not oneshot:
1015     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1016
1017   node = instance.primary_node
1018
1019   for dev in instance.disks:
1020     cfgw.SetDiskID(dev, node)
1021
1022   retries = 0
1023   while True:
1024     max_time = 0
1025     done = True
1026     cumul_degraded = False
1027     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1028     if not rstats:
1029       logger.ToStderr("Can't get any data from node %s" % node)
1030       retries += 1
1031       if retries >= 10:
1032         raise errors.RemoteError("Can't contact node %s for mirror data,"
1033                                  " aborting." % node)
1034       time.sleep(6)
1035       continue
1036     retries = 0
1037     for i in range(len(rstats)):
1038       mstat = rstats[i]
1039       if mstat is None:
1040         logger.ToStderr("Can't compute data for node %s/%s" %
1041                         (node, instance.disks[i].iv_name))
1042         continue
1043       perc_done, est_time, is_degraded = mstat
1044       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1045       if perc_done is not None:
1046         done = False
1047         if est_time is not None:
1048           rem_time = "%d estimated seconds remaining" % est_time
1049           max_time = est_time
1050         else:
1051           rem_time = "no time estimate"
1052         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1053                         (instance.disks[i].iv_name, perc_done, rem_time))
1054     if done or oneshot:
1055       break
1056
1057     if unlock:
1058       utils.Unlock('cmd')
1059     try:
1060       time.sleep(min(60, max_time))
1061     finally:
1062       if unlock:
1063         utils.Lock('cmd')
1064
1065   if done:
1066     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1067   return not cumul_degraded
1068
1069
1070 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1071   """Check that mirrors are not degraded.
1072
1073   """
1074   cfgw.SetDiskID(dev, node)
1075
1076   result = True
1077   if on_primary or dev.AssembleOnSecondary():
1078     rstats = rpc.call_blockdev_find(node, dev)
1079     if not rstats:
1080       logger.ToStderr("Can't get any data from node %s" % node)
1081       result = False
1082     else:
1083       result = result and (not rstats[5])
1084   if dev.children:
1085     for child in dev.children:
1086       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1087
1088   return result
1089
1090
1091 class LUDiagnoseOS(NoHooksLU):
1092   """Logical unit for OS diagnose/query.
1093
1094   """
1095   _OP_REQP = []
1096
1097   def CheckPrereq(self):
1098     """Check prerequisites.
1099
1100     This always succeeds, since this is a pure query LU.
1101
1102     """
1103     return
1104
1105   def Exec(self, feedback_fn):
1106     """Compute the list of OSes.
1107
1108     """
1109     node_list = self.cfg.GetNodeList()
1110     node_data = rpc.call_os_diagnose(node_list)
1111     if node_data == False:
1112       raise errors.OpExecError("Can't gather the list of OSes")
1113     return node_data
1114
1115
1116 class LURemoveNode(LogicalUnit):
1117   """Logical unit for removing a node.
1118
1119   """
1120   HPATH = "node-remove"
1121   HTYPE = constants.HTYPE_NODE
1122   _OP_REQP = ["node_name"]
1123
1124   def BuildHooksEnv(self):
1125     """Build hooks env.
1126
1127     This doesn't run on the target node in the pre phase as a failed
1128     node would not allows itself to run.
1129
1130     """
1131     env = {
1132       "OP_TARGET": self.op.node_name,
1133       "NODE_NAME": self.op.node_name,
1134       }
1135     all_nodes = self.cfg.GetNodeList()
1136     all_nodes.remove(self.op.node_name)
1137     return env, all_nodes, all_nodes
1138
1139   def CheckPrereq(self):
1140     """Check prerequisites.
1141
1142     This checks:
1143      - the node exists in the configuration
1144      - it does not have primary or secondary instances
1145      - it's not the master
1146
1147     Any errors are signalled by raising errors.OpPrereqError.
1148
1149     """
1150     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1151     if node is None:
1152       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1153
1154     instance_list = self.cfg.GetInstanceList()
1155
1156     masternode = self.sstore.GetMasterNode()
1157     if node.name == masternode:
1158       raise errors.OpPrereqError("Node is the master node,"
1159                                  " you need to failover first.")
1160
1161     for instance_name in instance_list:
1162       instance = self.cfg.GetInstanceInfo(instance_name)
1163       if node.name == instance.primary_node:
1164         raise errors.OpPrereqError("Instance %s still running on the node,"
1165                                    " please remove first." % instance_name)
1166       if node.name in instance.secondary_nodes:
1167         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1168                                    " please remove first." % instance_name)
1169     self.op.node_name = node.name
1170     self.node = node
1171
1172   def Exec(self, feedback_fn):
1173     """Removes the node from the cluster.
1174
1175     """
1176     node = self.node
1177     logger.Info("stopping the node daemon and removing configs from node %s" %
1178                 node.name)
1179
1180     rpc.call_node_leave_cluster(node.name)
1181
1182     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1183
1184     logger.Info("Removing node %s from config" % node.name)
1185
1186     self.cfg.RemoveNode(node.name)
1187
1188
1189 class LUQueryNodes(NoHooksLU):
1190   """Logical unit for querying nodes.
1191
1192   """
1193   _OP_REQP = ["output_fields", "names"]
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This checks that the fields required are valid output fields.
1199
1200     """
1201     self.dynamic_fields = frozenset(["dtotal", "dfree",
1202                                      "mtotal", "mnode", "mfree",
1203                                      "bootid"])
1204
1205     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1206                                "pinst_list", "sinst_list",
1207                                "pip", "sip"],
1208                        dynamic=self.dynamic_fields,
1209                        selected=self.op.output_fields)
1210
1211     self.wanted = _GetWantedNodes(self, self.op.names)
1212
1213   def Exec(self, feedback_fn):
1214     """Computes the list of nodes and their attributes.
1215
1216     """
1217     nodenames = self.wanted
1218     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1219
1220     # begin data gathering
1221
1222     if self.dynamic_fields.intersection(self.op.output_fields):
1223       live_data = {}
1224       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1225       for name in nodenames:
1226         nodeinfo = node_data.get(name, None)
1227         if nodeinfo:
1228           live_data[name] = {
1229             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1230             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1231             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1232             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1233             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1234             "bootid": nodeinfo['bootid'],
1235             }
1236         else:
1237           live_data[name] = {}
1238     else:
1239       live_data = dict.fromkeys(nodenames, {})
1240
1241     node_to_primary = dict([(name, set()) for name in nodenames])
1242     node_to_secondary = dict([(name, set()) for name in nodenames])
1243
1244     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1245                              "sinst_cnt", "sinst_list"))
1246     if inst_fields & frozenset(self.op.output_fields):
1247       instancelist = self.cfg.GetInstanceList()
1248
1249       for instance_name in instancelist:
1250         inst = self.cfg.GetInstanceInfo(instance_name)
1251         if inst.primary_node in node_to_primary:
1252           node_to_primary[inst.primary_node].add(inst.name)
1253         for secnode in inst.secondary_nodes:
1254           if secnode in node_to_secondary:
1255             node_to_secondary[secnode].add(inst.name)
1256
1257     # end data gathering
1258
1259     output = []
1260     for node in nodelist:
1261       node_output = []
1262       for field in self.op.output_fields:
1263         if field == "name":
1264           val = node.name
1265         elif field == "pinst_list":
1266           val = list(node_to_primary[node.name])
1267         elif field == "sinst_list":
1268           val = list(node_to_secondary[node.name])
1269         elif field == "pinst_cnt":
1270           val = len(node_to_primary[node.name])
1271         elif field == "sinst_cnt":
1272           val = len(node_to_secondary[node.name])
1273         elif field == "pip":
1274           val = node.primary_ip
1275         elif field == "sip":
1276           val = node.secondary_ip
1277         elif field in self.dynamic_fields:
1278           val = live_data[node.name].get(field, None)
1279         else:
1280           raise errors.ParameterError(field)
1281         node_output.append(val)
1282       output.append(node_output)
1283
1284     return output
1285
1286
1287 class LUQueryNodeVolumes(NoHooksLU):
1288   """Logical unit for getting volumes on node(s).
1289
1290   """
1291   _OP_REQP = ["nodes", "output_fields"]
1292
1293   def CheckPrereq(self):
1294     """Check prerequisites.
1295
1296     This checks that the fields required are valid output fields.
1297
1298     """
1299     self.nodes = _GetWantedNodes(self, self.op.nodes)
1300
1301     _CheckOutputFields(static=["node"],
1302                        dynamic=["phys", "vg", "name", "size", "instance"],
1303                        selected=self.op.output_fields)
1304
1305
1306   def Exec(self, feedback_fn):
1307     """Computes the list of nodes and their attributes.
1308
1309     """
1310     nodenames = self.nodes
1311     volumes = rpc.call_node_volumes(nodenames)
1312
1313     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1314              in self.cfg.GetInstanceList()]
1315
1316     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1317
1318     output = []
1319     for node in nodenames:
1320       if node not in volumes or not volumes[node]:
1321         continue
1322
1323       node_vols = volumes[node][:]
1324       node_vols.sort(key=lambda vol: vol['dev'])
1325
1326       for vol in node_vols:
1327         node_output = []
1328         for field in self.op.output_fields:
1329           if field == "node":
1330             val = node
1331           elif field == "phys":
1332             val = vol['dev']
1333           elif field == "vg":
1334             val = vol['vg']
1335           elif field == "name":
1336             val = vol['name']
1337           elif field == "size":
1338             val = int(float(vol['size']))
1339           elif field == "instance":
1340             for inst in ilist:
1341               if node not in lv_by_node[inst]:
1342                 continue
1343               if vol['name'] in lv_by_node[inst][node]:
1344                 val = inst.name
1345                 break
1346             else:
1347               val = '-'
1348           else:
1349             raise errors.ParameterError(field)
1350           node_output.append(str(val))
1351
1352         output.append(node_output)
1353
1354     return output
1355
1356
1357 class LUAddNode(LogicalUnit):
1358   """Logical unit for adding node to the cluster.
1359
1360   """
1361   HPATH = "node-add"
1362   HTYPE = constants.HTYPE_NODE
1363   _OP_REQP = ["node_name"]
1364
1365   def BuildHooksEnv(self):
1366     """Build hooks env.
1367
1368     This will run on all nodes before, and on all nodes + the new node after.
1369
1370     """
1371     env = {
1372       "OP_TARGET": self.op.node_name,
1373       "NODE_NAME": self.op.node_name,
1374       "NODE_PIP": self.op.primary_ip,
1375       "NODE_SIP": self.op.secondary_ip,
1376       }
1377     nodes_0 = self.cfg.GetNodeList()
1378     nodes_1 = nodes_0 + [self.op.node_name, ]
1379     return env, nodes_0, nodes_1
1380
1381   def CheckPrereq(self):
1382     """Check prerequisites.
1383
1384     This checks:
1385      - the new node is not already in the config
1386      - it is resolvable
1387      - its parameters (single/dual homed) matches the cluster
1388
1389     Any errors are signalled by raising errors.OpPrereqError.
1390
1391     """
1392     node_name = self.op.node_name
1393     cfg = self.cfg
1394
1395     dns_data = utils.HostInfo(node_name)
1396
1397     node = dns_data.name
1398     primary_ip = self.op.primary_ip = dns_data.ip
1399     secondary_ip = getattr(self.op, "secondary_ip", None)
1400     if secondary_ip is None:
1401       secondary_ip = primary_ip
1402     if not utils.IsValidIP(secondary_ip):
1403       raise errors.OpPrereqError("Invalid secondary IP given")
1404     self.op.secondary_ip = secondary_ip
1405     node_list = cfg.GetNodeList()
1406     if node in node_list:
1407       raise errors.OpPrereqError("Node %s is already in the configuration"
1408                                  % node)
1409
1410     for existing_node_name in node_list:
1411       existing_node = cfg.GetNodeInfo(existing_node_name)
1412       if (existing_node.primary_ip == primary_ip or
1413           existing_node.secondary_ip == primary_ip or
1414           existing_node.primary_ip == secondary_ip or
1415           existing_node.secondary_ip == secondary_ip):
1416         raise errors.OpPrereqError("New node ip address(es) conflict with"
1417                                    " existing node %s" % existing_node.name)
1418
1419     # check that the type of the node (single versus dual homed) is the
1420     # same as for the master
1421     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1422     master_singlehomed = myself.secondary_ip == myself.primary_ip
1423     newbie_singlehomed = secondary_ip == primary_ip
1424     if master_singlehomed != newbie_singlehomed:
1425       if master_singlehomed:
1426         raise errors.OpPrereqError("The master has no private ip but the"
1427                                    " new node has one")
1428       else:
1429         raise errors.OpPrereqError("The master has a private ip but the"
1430                                    " new node doesn't have one")
1431
1432     # checks reachablity
1433     if not utils.TcpPing(utils.HostInfo().name,
1434                          primary_ip,
1435                          constants.DEFAULT_NODED_PORT):
1436       raise errors.OpPrereqError("Node not reachable by ping")
1437
1438     if not newbie_singlehomed:
1439       # check reachability from my secondary ip to newbie's secondary ip
1440       if not utils.TcpPing(myself.secondary_ip,
1441                            secondary_ip,
1442                            constants.DEFAULT_NODED_PORT):
1443         raise errors.OpPrereqError(
1444           "Node secondary ip not reachable by TCP based ping to noded port")
1445
1446     self.new_node = objects.Node(name=node,
1447                                  primary_ip=primary_ip,
1448                                  secondary_ip=secondary_ip)
1449
1450   def Exec(self, feedback_fn):
1451     """Adds the new node to the cluster.
1452
1453     """
1454     new_node = self.new_node
1455     node = new_node.name
1456
1457     # set up inter-node password and certificate and restarts the node daemon
1458     gntpass = self.sstore.GetNodeDaemonPassword()
1459     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1460       raise errors.OpExecError("ganeti password corruption detected")
1461     f = open(constants.SSL_CERT_FILE)
1462     try:
1463       gntpem = f.read(8192)
1464     finally:
1465       f.close()
1466     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1467     # so we use this to detect an invalid certificate; as long as the
1468     # cert doesn't contain this, the here-document will be correctly
1469     # parsed by the shell sequence below
1470     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1471       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1472     if not gntpem.endswith("\n"):
1473       raise errors.OpExecError("PEM must end with newline")
1474     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1475
1476     # and then connect with ssh to set password and start ganeti-noded
1477     # note that all the below variables are sanitized at this point,
1478     # either by being constants or by the checks above
1479     ss = self.sstore
1480     mycommand = ("umask 077 && "
1481                  "echo '%s' > '%s' && "
1482                  "cat > '%s' << '!EOF.' && \n"
1483                  "%s!EOF.\n%s restart" %
1484                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1485                   constants.SSL_CERT_FILE, gntpem,
1486                   constants.NODE_INITD_SCRIPT))
1487
1488     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1489     if result.failed:
1490       raise errors.OpExecError("Remote command on node %s, error: %s,"
1491                                " output: %s" %
1492                                (node, result.fail_reason, result.output))
1493
1494     # check connectivity
1495     time.sleep(4)
1496
1497     result = rpc.call_version([node])[node]
1498     if result:
1499       if constants.PROTOCOL_VERSION == result:
1500         logger.Info("communication to node %s fine, sw version %s match" %
1501                     (node, result))
1502       else:
1503         raise errors.OpExecError("Version mismatch master version %s,"
1504                                  " node version %s" %
1505                                  (constants.PROTOCOL_VERSION, result))
1506     else:
1507       raise errors.OpExecError("Cannot get version from the new node")
1508
1509     # setup ssh on node
1510     logger.Info("copy ssh key to node %s" % node)
1511     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1512     keyarray = []
1513     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1514                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1515                 priv_key, pub_key]
1516
1517     for i in keyfiles:
1518       f = open(i, 'r')
1519       try:
1520         keyarray.append(f.read())
1521       finally:
1522         f.close()
1523
1524     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1525                                keyarray[3], keyarray[4], keyarray[5])
1526
1527     if not result:
1528       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1529
1530     # Add node to our /etc/hosts, and add key to known_hosts
1531     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1532     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1533                       self.cfg.GetHostKey())
1534
1535     if new_node.secondary_ip != new_node.primary_ip:
1536       if not rpc.call_node_tcp_ping(new_node.name,
1537                                     constants.LOCALHOST_IP_ADDRESS,
1538                                     new_node.secondary_ip,
1539                                     constants.DEFAULT_NODED_PORT,
1540                                     10, False):
1541         raise errors.OpExecError("Node claims it doesn't have the"
1542                                  " secondary ip you gave (%s).\n"
1543                                  "Please fix and re-run this command." %
1544                                  new_node.secondary_ip)
1545
1546     success, msg = ssh.VerifyNodeHostname(node)
1547     if not success:
1548       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1549                                " than the one the resolver gives: %s.\n"
1550                                "Please fix and re-run this command." %
1551                                (node, msg))
1552
1553     # Distribute updated /etc/hosts and known_hosts to all nodes,
1554     # including the node just added
1555     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1556     dist_nodes = self.cfg.GetNodeList() + [node]
1557     if myself.name in dist_nodes:
1558       dist_nodes.remove(myself.name)
1559
1560     logger.Debug("Copying hosts and known_hosts to all nodes")
1561     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1562       result = rpc.call_upload_file(dist_nodes, fname)
1563       for to_node in dist_nodes:
1564         if not result[to_node]:
1565           logger.Error("copy of file %s to node %s failed" %
1566                        (fname, to_node))
1567
1568     to_copy = ss.GetFileList()
1569     for fname in to_copy:
1570       if not ssh.CopyFileToNode(node, fname):
1571         logger.Error("could not copy file %s to node %s" % (fname, node))
1572
1573     logger.Info("adding node %s to cluster.conf" % node)
1574     self.cfg.AddNode(new_node)
1575
1576
1577 class LUMasterFailover(LogicalUnit):
1578   """Failover the master node to the current node.
1579
1580   This is a special LU in that it must run on a non-master node.
1581
1582   """
1583   HPATH = "master-failover"
1584   HTYPE = constants.HTYPE_CLUSTER
1585   REQ_MASTER = False
1586   _OP_REQP = []
1587
1588   def BuildHooksEnv(self):
1589     """Build hooks env.
1590
1591     This will run on the new master only in the pre phase, and on all
1592     the nodes in the post phase.
1593
1594     """
1595     env = {
1596       "OP_TARGET": self.new_master,
1597       "NEW_MASTER": self.new_master,
1598       "OLD_MASTER": self.old_master,
1599       }
1600     return env, [self.new_master], self.cfg.GetNodeList()
1601
1602   def CheckPrereq(self):
1603     """Check prerequisites.
1604
1605     This checks that we are not already the master.
1606
1607     """
1608     self.new_master = utils.HostInfo().name
1609     self.old_master = self.sstore.GetMasterNode()
1610
1611     if self.old_master == self.new_master:
1612       raise errors.OpPrereqError("This commands must be run on the node"
1613                                  " where you want the new master to be.\n"
1614                                  "%s is already the master" %
1615                                  self.old_master)
1616
1617   def Exec(self, feedback_fn):
1618     """Failover the master node.
1619
1620     This command, when run on a non-master node, will cause the current
1621     master to cease being master, and the non-master to become new
1622     master.
1623
1624     """
1625     #TODO: do not rely on gethostname returning the FQDN
1626     logger.Info("setting master to %s, old master: %s" %
1627                 (self.new_master, self.old_master))
1628
1629     if not rpc.call_node_stop_master(self.old_master):
1630       logger.Error("could disable the master role on the old master"
1631                    " %s, please disable manually" % self.old_master)
1632
1633     ss = self.sstore
1634     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1635     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1636                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1637       logger.Error("could not distribute the new simple store master file"
1638                    " to the other nodes, please check.")
1639
1640     if not rpc.call_node_start_master(self.new_master):
1641       logger.Error("could not start the master role on the new master"
1642                    " %s, please check" % self.new_master)
1643       feedback_fn("Error in activating the master IP on the new master,\n"
1644                   "please fix manually.")
1645
1646
1647
1648 class LUQueryClusterInfo(NoHooksLU):
1649   """Query cluster configuration.
1650
1651   """
1652   _OP_REQP = []
1653   REQ_MASTER = False
1654
1655   def CheckPrereq(self):
1656     """No prerequsites needed for this LU.
1657
1658     """
1659     pass
1660
1661   def Exec(self, feedback_fn):
1662     """Return cluster config.
1663
1664     """
1665     result = {
1666       "name": self.sstore.GetClusterName(),
1667       "software_version": constants.RELEASE_VERSION,
1668       "protocol_version": constants.PROTOCOL_VERSION,
1669       "config_version": constants.CONFIG_VERSION,
1670       "os_api_version": constants.OS_API_VERSION,
1671       "export_version": constants.EXPORT_VERSION,
1672       "master": self.sstore.GetMasterNode(),
1673       "architecture": (platform.architecture()[0], platform.machine()),
1674       }
1675
1676     return result
1677
1678
1679 class LUClusterCopyFile(NoHooksLU):
1680   """Copy file to cluster.
1681
1682   """
1683   _OP_REQP = ["nodes", "filename"]
1684
1685   def CheckPrereq(self):
1686     """Check prerequisites.
1687
1688     It should check that the named file exists and that the given list
1689     of nodes is valid.
1690
1691     """
1692     if not os.path.exists(self.op.filename):
1693       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1694
1695     self.nodes = _GetWantedNodes(self, self.op.nodes)
1696
1697   def Exec(self, feedback_fn):
1698     """Copy a file from master to some nodes.
1699
1700     Args:
1701       opts - class with options as members
1702       args - list containing a single element, the file name
1703     Opts used:
1704       nodes - list containing the name of target nodes; if empty, all nodes
1705
1706     """
1707     filename = self.op.filename
1708
1709     myname = utils.HostInfo().name
1710
1711     for node in self.nodes:
1712       if node == myname:
1713         continue
1714       if not ssh.CopyFileToNode(node, filename):
1715         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1716
1717
1718 class LUDumpClusterConfig(NoHooksLU):
1719   """Return a text-representation of the cluster-config.
1720
1721   """
1722   _OP_REQP = []
1723
1724   def CheckPrereq(self):
1725     """No prerequisites.
1726
1727     """
1728     pass
1729
1730   def Exec(self, feedback_fn):
1731     """Dump a representation of the cluster config to the standard output.
1732
1733     """
1734     return self.cfg.DumpConfig()
1735
1736
1737 class LURunClusterCommand(NoHooksLU):
1738   """Run a command on some nodes.
1739
1740   """
1741   _OP_REQP = ["command", "nodes"]
1742
1743   def CheckPrereq(self):
1744     """Check prerequisites.
1745
1746     It checks that the given list of nodes is valid.
1747
1748     """
1749     self.nodes = _GetWantedNodes(self, self.op.nodes)
1750
1751   def Exec(self, feedback_fn):
1752     """Run a command on some nodes.
1753
1754     """
1755     data = []
1756     for node in self.nodes:
1757       result = ssh.SSHCall(node, "root", self.op.command)
1758       data.append((node, result.output, result.exit_code))
1759
1760     return data
1761
1762
1763 class LUActivateInstanceDisks(NoHooksLU):
1764   """Bring up an instance's disks.
1765
1766   """
1767   _OP_REQP = ["instance_name"]
1768
1769   def CheckPrereq(self):
1770     """Check prerequisites.
1771
1772     This checks that the instance is in the cluster.
1773
1774     """
1775     instance = self.cfg.GetInstanceInfo(
1776       self.cfg.ExpandInstanceName(self.op.instance_name))
1777     if instance is None:
1778       raise errors.OpPrereqError("Instance '%s' not known" %
1779                                  self.op.instance_name)
1780     self.instance = instance
1781
1782
1783   def Exec(self, feedback_fn):
1784     """Activate the disks.
1785
1786     """
1787     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1788     if not disks_ok:
1789       raise errors.OpExecError("Cannot activate block devices")
1790
1791     return disks_info
1792
1793
1794 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1795   """Prepare the block devices for an instance.
1796
1797   This sets up the block devices on all nodes.
1798
1799   Args:
1800     instance: a ganeti.objects.Instance object
1801     ignore_secondaries: if true, errors on secondary nodes won't result
1802                         in an error return from the function
1803
1804   Returns:
1805     false if the operation failed
1806     list of (host, instance_visible_name, node_visible_name) if the operation
1807          suceeded with the mapping from node devices to instance devices
1808   """
1809   device_info = []
1810   disks_ok = True
1811   for inst_disk in instance.disks:
1812     master_result = None
1813     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1814       cfg.SetDiskID(node_disk, node)
1815       is_primary = node == instance.primary_node
1816       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1817       if not result:
1818         logger.Error("could not prepare block device %s on node %s (is_pri"
1819                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1820         if is_primary or not ignore_secondaries:
1821           disks_ok = False
1822       if is_primary:
1823         master_result = result
1824     device_info.append((instance.primary_node, inst_disk.iv_name,
1825                         master_result))
1826
1827   return disks_ok, device_info
1828
1829
1830 def _StartInstanceDisks(cfg, instance, force):
1831   """Start the disks of an instance.
1832
1833   """
1834   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1835                                            ignore_secondaries=force)
1836   if not disks_ok:
1837     _ShutdownInstanceDisks(instance, cfg)
1838     if force is not None and not force:
1839       logger.Error("If the message above refers to a secondary node,"
1840                    " you can retry the operation using '--force'.")
1841     raise errors.OpExecError("Disk consistency error")
1842
1843
1844 class LUDeactivateInstanceDisks(NoHooksLU):
1845   """Shutdown an instance's disks.
1846
1847   """
1848   _OP_REQP = ["instance_name"]
1849
1850   def CheckPrereq(self):
1851     """Check prerequisites.
1852
1853     This checks that the instance is in the cluster.
1854
1855     """
1856     instance = self.cfg.GetInstanceInfo(
1857       self.cfg.ExpandInstanceName(self.op.instance_name))
1858     if instance is None:
1859       raise errors.OpPrereqError("Instance '%s' not known" %
1860                                  self.op.instance_name)
1861     self.instance = instance
1862
1863   def Exec(self, feedback_fn):
1864     """Deactivate the disks
1865
1866     """
1867     instance = self.instance
1868     ins_l = rpc.call_instance_list([instance.primary_node])
1869     ins_l = ins_l[instance.primary_node]
1870     if not type(ins_l) is list:
1871       raise errors.OpExecError("Can't contact node '%s'" %
1872                                instance.primary_node)
1873
1874     if self.instance.name in ins_l:
1875       raise errors.OpExecError("Instance is running, can't shutdown"
1876                                " block devices.")
1877
1878     _ShutdownInstanceDisks(instance, self.cfg)
1879
1880
1881 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1882   """Shutdown block devices of an instance.
1883
1884   This does the shutdown on all nodes of the instance.
1885
1886   If the ignore_primary is false, errors on the primary node are
1887   ignored.
1888
1889   """
1890   result = True
1891   for disk in instance.disks:
1892     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1893       cfg.SetDiskID(top_disk, node)
1894       if not rpc.call_blockdev_shutdown(node, top_disk):
1895         logger.Error("could not shutdown block device %s on node %s" %
1896                      (disk.iv_name, node))
1897         if not ignore_primary or node != instance.primary_node:
1898           result = False
1899   return result
1900
1901
1902 class LUStartupInstance(LogicalUnit):
1903   """Starts an instance.
1904
1905   """
1906   HPATH = "instance-start"
1907   HTYPE = constants.HTYPE_INSTANCE
1908   _OP_REQP = ["instance_name", "force"]
1909
1910   def BuildHooksEnv(self):
1911     """Build hooks env.
1912
1913     This runs on master, primary and secondary nodes of the instance.
1914
1915     """
1916     env = {
1917       "FORCE": self.op.force,
1918       }
1919     env.update(_BuildInstanceHookEnvByObject(self.instance))
1920     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1921           list(self.instance.secondary_nodes))
1922     return env, nl, nl
1923
1924   def CheckPrereq(self):
1925     """Check prerequisites.
1926
1927     This checks that the instance is in the cluster.
1928
1929     """
1930     instance = self.cfg.GetInstanceInfo(
1931       self.cfg.ExpandInstanceName(self.op.instance_name))
1932     if instance is None:
1933       raise errors.OpPrereqError("Instance '%s' not known" %
1934                                  self.op.instance_name)
1935
1936     # check bridges existance
1937     brlist = [nic.bridge for nic in instance.nics]
1938     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1939       raise errors.OpPrereqError("one or more target bridges %s does not"
1940                                  " exist on destination node '%s'" %
1941                                  (brlist, instance.primary_node))
1942
1943     self.instance = instance
1944     self.op.instance_name = instance.name
1945
1946   def Exec(self, feedback_fn):
1947     """Start the instance.
1948
1949     """
1950     instance = self.instance
1951     force = self.op.force
1952     extra_args = getattr(self.op, "extra_args", "")
1953
1954     node_current = instance.primary_node
1955
1956     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1957     if not nodeinfo:
1958       raise errors.OpExecError("Could not contact node %s for infos" %
1959                                (node_current))
1960
1961     freememory = nodeinfo[node_current]['memory_free']
1962     memory = instance.memory
1963     if memory > freememory:
1964       raise errors.OpExecError("Not enough memory to start instance"
1965                                " %s on node %s"
1966                                " needed %s MiB, available %s MiB" %
1967                                (instance.name, node_current, memory,
1968                                 freememory))
1969
1970     _StartInstanceDisks(self.cfg, instance, force)
1971
1972     if not rpc.call_instance_start(node_current, instance, extra_args):
1973       _ShutdownInstanceDisks(instance, self.cfg)
1974       raise errors.OpExecError("Could not start instance")
1975
1976     self.cfg.MarkInstanceUp(instance.name)
1977
1978
1979 class LUShutdownInstance(LogicalUnit):
1980   """Shutdown an instance.
1981
1982   """
1983   HPATH = "instance-stop"
1984   HTYPE = constants.HTYPE_INSTANCE
1985   _OP_REQP = ["instance_name"]
1986
1987   def BuildHooksEnv(self):
1988     """Build hooks env.
1989
1990     This runs on master, primary and secondary nodes of the instance.
1991
1992     """
1993     env = _BuildInstanceHookEnvByObject(self.instance)
1994     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1995           list(self.instance.secondary_nodes))
1996     return env, nl, nl
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     This checks that the instance is in the cluster.
2002
2003     """
2004     instance = self.cfg.GetInstanceInfo(
2005       self.cfg.ExpandInstanceName(self.op.instance_name))
2006     if instance is None:
2007       raise errors.OpPrereqError("Instance '%s' not known" %
2008                                  self.op.instance_name)
2009     self.instance = instance
2010
2011   def Exec(self, feedback_fn):
2012     """Shutdown the instance.
2013
2014     """
2015     instance = self.instance
2016     node_current = instance.primary_node
2017     if not rpc.call_instance_shutdown(node_current, instance):
2018       logger.Error("could not shutdown instance")
2019
2020     self.cfg.MarkInstanceDown(instance.name)
2021     _ShutdownInstanceDisks(instance, self.cfg)
2022
2023
2024 class LUReinstallInstance(LogicalUnit):
2025   """Reinstall an instance.
2026
2027   """
2028   HPATH = "instance-reinstall"
2029   HTYPE = constants.HTYPE_INSTANCE
2030   _OP_REQP = ["instance_name"]
2031
2032   def BuildHooksEnv(self):
2033     """Build hooks env.
2034
2035     This runs on master, primary and secondary nodes of the instance.
2036
2037     """
2038     env = _BuildInstanceHookEnvByObject(self.instance)
2039     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2040           list(self.instance.secondary_nodes))
2041     return env, nl, nl
2042
2043   def CheckPrereq(self):
2044     """Check prerequisites.
2045
2046     This checks that the instance is in the cluster and is not running.
2047
2048     """
2049     instance = self.cfg.GetInstanceInfo(
2050       self.cfg.ExpandInstanceName(self.op.instance_name))
2051     if instance is None:
2052       raise errors.OpPrereqError("Instance '%s' not known" %
2053                                  self.op.instance_name)
2054     if instance.disk_template == constants.DT_DISKLESS:
2055       raise errors.OpPrereqError("Instance '%s' has no disks" %
2056                                  self.op.instance_name)
2057     if instance.status != "down":
2058       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2059                                  self.op.instance_name)
2060     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2061     if remote_info:
2062       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2063                                  (self.op.instance_name,
2064                                   instance.primary_node))
2065
2066     self.op.os_type = getattr(self.op, "os_type", None)
2067     if self.op.os_type is not None:
2068       # OS verification
2069       pnode = self.cfg.GetNodeInfo(
2070         self.cfg.ExpandNodeName(instance.primary_node))
2071       if pnode is None:
2072         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2073                                    self.op.pnode)
2074       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2075       if not isinstance(os_obj, objects.OS):
2076         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2077                                    " primary node"  % self.op.os_type)
2078
2079     self.instance = instance
2080
2081   def Exec(self, feedback_fn):
2082     """Reinstall the instance.
2083
2084     """
2085     inst = self.instance
2086
2087     if self.op.os_type is not None:
2088       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2089       inst.os = self.op.os_type
2090       self.cfg.AddInstance(inst)
2091
2092     _StartInstanceDisks(self.cfg, inst, None)
2093     try:
2094       feedback_fn("Running the instance OS create scripts...")
2095       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2096         raise errors.OpExecError("Could not install OS for instance %s "
2097                                  "on node %s" %
2098                                  (inst.name, inst.primary_node))
2099     finally:
2100       _ShutdownInstanceDisks(inst, self.cfg)
2101
2102
2103 class LURenameInstance(LogicalUnit):
2104   """Rename an instance.
2105
2106   """
2107   HPATH = "instance-rename"
2108   HTYPE = constants.HTYPE_INSTANCE
2109   _OP_REQP = ["instance_name", "new_name"]
2110
2111   def BuildHooksEnv(self):
2112     """Build hooks env.
2113
2114     This runs on master, primary and secondary nodes of the instance.
2115
2116     """
2117     env = _BuildInstanceHookEnvByObject(self.instance)
2118     env["INSTANCE_NEW_NAME"] = self.op.new_name
2119     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2120           list(self.instance.secondary_nodes))
2121     return env, nl, nl
2122
2123   def CheckPrereq(self):
2124     """Check prerequisites.
2125
2126     This checks that the instance is in the cluster and is not running.
2127
2128     """
2129     instance = self.cfg.GetInstanceInfo(
2130       self.cfg.ExpandInstanceName(self.op.instance_name))
2131     if instance is None:
2132       raise errors.OpPrereqError("Instance '%s' not known" %
2133                                  self.op.instance_name)
2134     if instance.status != "down":
2135       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2136                                  self.op.instance_name)
2137     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2138     if remote_info:
2139       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2140                                  (self.op.instance_name,
2141                                   instance.primary_node))
2142     self.instance = instance
2143
2144     # new name verification
2145     name_info = utils.HostInfo(self.op.new_name)
2146
2147     self.op.new_name = new_name = name_info.name
2148     if not getattr(self.op, "ignore_ip", False):
2149       command = ["fping", "-q", name_info.ip]
2150       result = utils.RunCmd(command)
2151       if not result.failed:
2152         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2153                                    (name_info.ip, new_name))
2154
2155
2156   def Exec(self, feedback_fn):
2157     """Reinstall the instance.
2158
2159     """
2160     inst = self.instance
2161     old_name = inst.name
2162
2163     self.cfg.RenameInstance(inst.name, self.op.new_name)
2164
2165     # re-read the instance from the configuration after rename
2166     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2167
2168     _StartInstanceDisks(self.cfg, inst, None)
2169     try:
2170       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2171                                           "sda", "sdb"):
2172         msg = ("Could run OS rename script for instance %s\n"
2173                "on node %s\n"
2174                "(but the instance has been renamed in Ganeti)" %
2175                (inst.name, inst.primary_node))
2176         logger.Error(msg)
2177     finally:
2178       _ShutdownInstanceDisks(inst, self.cfg)
2179
2180
2181 class LURemoveInstance(LogicalUnit):
2182   """Remove an instance.
2183
2184   """
2185   HPATH = "instance-remove"
2186   HTYPE = constants.HTYPE_INSTANCE
2187   _OP_REQP = ["instance_name"]
2188
2189   def BuildHooksEnv(self):
2190     """Build hooks env.
2191
2192     This runs on master, primary and secondary nodes of the instance.
2193
2194     """
2195     env = _BuildInstanceHookEnvByObject(self.instance)
2196     nl = [self.sstore.GetMasterNode()]
2197     return env, nl, nl
2198
2199   def CheckPrereq(self):
2200     """Check prerequisites.
2201
2202     This checks that the instance is in the cluster.
2203
2204     """
2205     instance = self.cfg.GetInstanceInfo(
2206       self.cfg.ExpandInstanceName(self.op.instance_name))
2207     if instance is None:
2208       raise errors.OpPrereqError("Instance '%s' not known" %
2209                                  self.op.instance_name)
2210     self.instance = instance
2211
2212   def Exec(self, feedback_fn):
2213     """Remove the instance.
2214
2215     """
2216     instance = self.instance
2217     logger.Info("shutting down instance %s on node %s" %
2218                 (instance.name, instance.primary_node))
2219
2220     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2221       if self.op.ignore_failures:
2222         feedback_fn("Warning: can't shutdown instance")
2223       else:
2224         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2225                                  (instance.name, instance.primary_node))
2226
2227     logger.Info("removing block devices for instance %s" % instance.name)
2228
2229     if not _RemoveDisks(instance, self.cfg):
2230       if self.op.ignore_failures:
2231         feedback_fn("Warning: can't remove instance's disks")
2232       else:
2233         raise errors.OpExecError("Can't remove instance's disks")
2234
2235     logger.Info("removing instance %s out of cluster config" % instance.name)
2236
2237     self.cfg.RemoveInstance(instance.name)
2238
2239
2240 class LUQueryInstances(NoHooksLU):
2241   """Logical unit for querying instances.
2242
2243   """
2244   _OP_REQP = ["output_fields", "names"]
2245
2246   def CheckPrereq(self):
2247     """Check prerequisites.
2248
2249     This checks that the fields required are valid output fields.
2250
2251     """
2252     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2253     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2254                                "admin_state", "admin_ram",
2255                                "disk_template", "ip", "mac", "bridge",
2256                                "sda_size", "sdb_size"],
2257                        dynamic=self.dynamic_fields,
2258                        selected=self.op.output_fields)
2259
2260     self.wanted = _GetWantedInstances(self, self.op.names)
2261
2262   def Exec(self, feedback_fn):
2263     """Computes the list of nodes and their attributes.
2264
2265     """
2266     instance_names = self.wanted
2267     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2268                      in instance_names]
2269
2270     # begin data gathering
2271
2272     nodes = frozenset([inst.primary_node for inst in instance_list])
2273
2274     bad_nodes = []
2275     if self.dynamic_fields.intersection(self.op.output_fields):
2276       live_data = {}
2277       node_data = rpc.call_all_instances_info(nodes)
2278       for name in nodes:
2279         result = node_data[name]
2280         if result:
2281           live_data.update(result)
2282         elif result == False:
2283           bad_nodes.append(name)
2284         # else no instance is alive
2285     else:
2286       live_data = dict([(name, {}) for name in instance_names])
2287
2288     # end data gathering
2289
2290     output = []
2291     for instance in instance_list:
2292       iout = []
2293       for field in self.op.output_fields:
2294         if field == "name":
2295           val = instance.name
2296         elif field == "os":
2297           val = instance.os
2298         elif field == "pnode":
2299           val = instance.primary_node
2300         elif field == "snodes":
2301           val = list(instance.secondary_nodes)
2302         elif field == "admin_state":
2303           val = (instance.status != "down")
2304         elif field == "oper_state":
2305           if instance.primary_node in bad_nodes:
2306             val = None
2307           else:
2308             val = bool(live_data.get(instance.name))
2309         elif field == "admin_ram":
2310           val = instance.memory
2311         elif field == "oper_ram":
2312           if instance.primary_node in bad_nodes:
2313             val = None
2314           elif instance.name in live_data:
2315             val = live_data[instance.name].get("memory", "?")
2316           else:
2317             val = "-"
2318         elif field == "disk_template":
2319           val = instance.disk_template
2320         elif field == "ip":
2321           val = instance.nics[0].ip
2322         elif field == "bridge":
2323           val = instance.nics[0].bridge
2324         elif field == "mac":
2325           val = instance.nics[0].mac
2326         elif field == "sda_size" or field == "sdb_size":
2327           disk = instance.FindDisk(field[:3])
2328           if disk is None:
2329             val = None
2330           else:
2331             val = disk.size
2332         else:
2333           raise errors.ParameterError(field)
2334         iout.append(val)
2335       output.append(iout)
2336
2337     return output
2338
2339
2340 class LUFailoverInstance(LogicalUnit):
2341   """Failover an instance.
2342
2343   """
2344   HPATH = "instance-failover"
2345   HTYPE = constants.HTYPE_INSTANCE
2346   _OP_REQP = ["instance_name", "ignore_consistency"]
2347
2348   def BuildHooksEnv(self):
2349     """Build hooks env.
2350
2351     This runs on master, primary and secondary nodes of the instance.
2352
2353     """
2354     env = {
2355       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2356       }
2357     env.update(_BuildInstanceHookEnvByObject(self.instance))
2358     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2359     return env, nl, nl
2360
2361   def CheckPrereq(self):
2362     """Check prerequisites.
2363
2364     This checks that the instance is in the cluster.
2365
2366     """
2367     instance = self.cfg.GetInstanceInfo(
2368       self.cfg.ExpandInstanceName(self.op.instance_name))
2369     if instance is None:
2370       raise errors.OpPrereqError("Instance '%s' not known" %
2371                                  self.op.instance_name)
2372
2373     if instance.disk_template != constants.DT_REMOTE_RAID1:
2374       raise errors.OpPrereqError("Instance's disk layout is not"
2375                                  " remote_raid1.")
2376
2377     secondary_nodes = instance.secondary_nodes
2378     if not secondary_nodes:
2379       raise errors.ProgrammerError("no secondary node but using "
2380                                    "DT_REMOTE_RAID1 template")
2381
2382     # check memory requirements on the secondary node
2383     target_node = secondary_nodes[0]
2384     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2385     info = nodeinfo.get(target_node, None)
2386     if not info:
2387       raise errors.OpPrereqError("Cannot get current information"
2388                                  " from node '%s'" % nodeinfo)
2389     if instance.memory > info['memory_free']:
2390       raise errors.OpPrereqError("Not enough memory on target node %s."
2391                                  " %d MB available, %d MB required" %
2392                                  (target_node, info['memory_free'],
2393                                   instance.memory))
2394
2395     # check bridge existance
2396     brlist = [nic.bridge for nic in instance.nics]
2397     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2398       raise errors.OpPrereqError("One or more target bridges %s does not"
2399                                  " exist on destination node '%s'" %
2400                                  (brlist, instance.primary_node))
2401
2402     self.instance = instance
2403
2404   def Exec(self, feedback_fn):
2405     """Failover an instance.
2406
2407     The failover is done by shutting it down on its present node and
2408     starting it on the secondary.
2409
2410     """
2411     instance = self.instance
2412
2413     source_node = instance.primary_node
2414     target_node = instance.secondary_nodes[0]
2415
2416     feedback_fn("* checking disk consistency between source and target")
2417     for dev in instance.disks:
2418       # for remote_raid1, these are md over drbd
2419       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2420         if not self.op.ignore_consistency:
2421           raise errors.OpExecError("Disk %s is degraded on target node,"
2422                                    " aborting failover." % dev.iv_name)
2423
2424     feedback_fn("* checking target node resource availability")
2425     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2426
2427     if not nodeinfo:
2428       raise errors.OpExecError("Could not contact target node %s." %
2429                                target_node)
2430
2431     free_memory = int(nodeinfo[target_node]['memory_free'])
2432     memory = instance.memory
2433     if memory > free_memory:
2434       raise errors.OpExecError("Not enough memory to create instance %s on"
2435                                " node %s. needed %s MiB, available %s MiB" %
2436                                (instance.name, target_node, memory,
2437                                 free_memory))
2438
2439     feedback_fn("* shutting down instance on source node")
2440     logger.Info("Shutting down instance %s on node %s" %
2441                 (instance.name, source_node))
2442
2443     if not rpc.call_instance_shutdown(source_node, instance):
2444       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2445                    " anyway. Please make sure node %s is down"  %
2446                    (instance.name, source_node, source_node))
2447
2448     feedback_fn("* deactivating the instance's disks on source node")
2449     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2450       raise errors.OpExecError("Can't shut down the instance's disks.")
2451
2452     instance.primary_node = target_node
2453     # distribute new instance config to the other nodes
2454     self.cfg.AddInstance(instance)
2455
2456     feedback_fn("* activating the instance's disks on target node")
2457     logger.Info("Starting instance %s on node %s" %
2458                 (instance.name, target_node))
2459
2460     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2461                                              ignore_secondaries=True)
2462     if not disks_ok:
2463       _ShutdownInstanceDisks(instance, self.cfg)
2464       raise errors.OpExecError("Can't activate the instance's disks")
2465
2466     feedback_fn("* starting the instance on the target node")
2467     if not rpc.call_instance_start(target_node, instance, None):
2468       _ShutdownInstanceDisks(instance, self.cfg)
2469       raise errors.OpExecError("Could not start instance %s on node %s." %
2470                                (instance.name, target_node))
2471
2472
2473 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2474   """Create a tree of block devices on the primary node.
2475
2476   This always creates all devices.
2477
2478   """
2479   if device.children:
2480     for child in device.children:
2481       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2482         return False
2483
2484   cfg.SetDiskID(device, node)
2485   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2486   if not new_id:
2487     return False
2488   if device.physical_id is None:
2489     device.physical_id = new_id
2490   return True
2491
2492
2493 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2494   """Create a tree of block devices on a secondary node.
2495
2496   If this device type has to be created on secondaries, create it and
2497   all its children.
2498
2499   If not, just recurse to children keeping the same 'force' value.
2500
2501   """
2502   if device.CreateOnSecondary():
2503     force = True
2504   if device.children:
2505     for child in device.children:
2506       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2507         return False
2508
2509   if not force:
2510     return True
2511   cfg.SetDiskID(device, node)
2512   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2513   if not new_id:
2514     return False
2515   if device.physical_id is None:
2516     device.physical_id = new_id
2517   return True
2518
2519
2520 def _GenerateUniqueNames(cfg, exts):
2521   """Generate a suitable LV name.
2522
2523   This will generate a logical volume name for the given instance.
2524
2525   """
2526   results = []
2527   for val in exts:
2528     new_id = cfg.GenerateUniqueID()
2529     results.append("%s%s" % (new_id, val))
2530   return results
2531
2532
2533 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2534   """Generate a drbd device complete with its children.
2535
2536   """
2537   port = cfg.AllocatePort()
2538   vgname = cfg.GetVGName()
2539   dev_data = objects.Disk(dev_type="lvm", size=size,
2540                           logical_id=(vgname, names[0]))
2541   dev_meta = objects.Disk(dev_type="lvm", size=128,
2542                           logical_id=(vgname, names[1]))
2543   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2544                           logical_id = (primary, secondary, port),
2545                           children = [dev_data, dev_meta])
2546   return drbd_dev
2547
2548
2549 def _GenerateDiskTemplate(cfg, template_name,
2550                           instance_name, primary_node,
2551                           secondary_nodes, disk_sz, swap_sz):
2552   """Generate the entire disk layout for a given template type.
2553
2554   """
2555   #TODO: compute space requirements
2556
2557   vgname = cfg.GetVGName()
2558   if template_name == "diskless":
2559     disks = []
2560   elif template_name == "plain":
2561     if len(secondary_nodes) != 0:
2562       raise errors.ProgrammerError("Wrong template configuration")
2563
2564     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2565     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2566                            logical_id=(vgname, names[0]),
2567                            iv_name = "sda")
2568     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2569                            logical_id=(vgname, names[1]),
2570                            iv_name = "sdb")
2571     disks = [sda_dev, sdb_dev]
2572   elif template_name == "local_raid1":
2573     if len(secondary_nodes) != 0:
2574       raise errors.ProgrammerError("Wrong template configuration")
2575
2576
2577     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2578                                        ".sdb_m1", ".sdb_m2"])
2579     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2580                               logical_id=(vgname, names[0]))
2581     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2582                               logical_id=(vgname, names[1]))
2583     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2584                               size=disk_sz,
2585                               children = [sda_dev_m1, sda_dev_m2])
2586     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2587                               logical_id=(vgname, names[2]))
2588     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2589                               logical_id=(vgname, names[3]))
2590     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2591                               size=swap_sz,
2592                               children = [sdb_dev_m1, sdb_dev_m2])
2593     disks = [md_sda_dev, md_sdb_dev]
2594   elif template_name == constants.DT_REMOTE_RAID1:
2595     if len(secondary_nodes) != 1:
2596       raise errors.ProgrammerError("Wrong template configuration")
2597     remote_node = secondary_nodes[0]
2598     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2599                                        ".sdb_data", ".sdb_meta"])
2600     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2601                                          disk_sz, names[0:2])
2602     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2603                               children = [drbd_sda_dev], size=disk_sz)
2604     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2605                                          swap_sz, names[2:4])
2606     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2607                               children = [drbd_sdb_dev], size=swap_sz)
2608     disks = [md_sda_dev, md_sdb_dev]
2609   else:
2610     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2611   return disks
2612
2613
2614 def _GetInstanceInfoText(instance):
2615   """Compute that text that should be added to the disk's metadata.
2616
2617   """
2618   return "originstname+%s" % instance.name
2619
2620
2621 def _CreateDisks(cfg, instance):
2622   """Create all disks for an instance.
2623
2624   This abstracts away some work from AddInstance.
2625
2626   Args:
2627     instance: the instance object
2628
2629   Returns:
2630     True or False showing the success of the creation process
2631
2632   """
2633   info = _GetInstanceInfoText(instance)
2634
2635   for device in instance.disks:
2636     logger.Info("creating volume %s for instance %s" %
2637               (device.iv_name, instance.name))
2638     #HARDCODE
2639     for secondary_node in instance.secondary_nodes:
2640       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2641                                         info):
2642         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2643                      (device.iv_name, device, secondary_node))
2644         return False
2645     #HARDCODE
2646     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2647       logger.Error("failed to create volume %s on primary!" %
2648                    device.iv_name)
2649       return False
2650   return True
2651
2652
2653 def _RemoveDisks(instance, cfg):
2654   """Remove all disks for an instance.
2655
2656   This abstracts away some work from `AddInstance()` and
2657   `RemoveInstance()`. Note that in case some of the devices couldn't
2658   be removed, the removal will continue with the other ones (compare
2659   with `_CreateDisks()`).
2660
2661   Args:
2662     instance: the instance object
2663
2664   Returns:
2665     True or False showing the success of the removal proces
2666
2667   """
2668   logger.Info("removing block devices for instance %s" % instance.name)
2669
2670   result = True
2671   for device in instance.disks:
2672     for node, disk in device.ComputeNodeTree(instance.primary_node):
2673       cfg.SetDiskID(disk, node)
2674       if not rpc.call_blockdev_remove(node, disk):
2675         logger.Error("could not remove block device %s on node %s,"
2676                      " continuing anyway" %
2677                      (device.iv_name, node))
2678         result = False
2679   return result
2680
2681
2682 class LUCreateInstance(LogicalUnit):
2683   """Create an instance.
2684
2685   """
2686   HPATH = "instance-add"
2687   HTYPE = constants.HTYPE_INSTANCE
2688   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2689               "disk_template", "swap_size", "mode", "start", "vcpus",
2690               "wait_for_sync", "ip_check"]
2691
2692   def BuildHooksEnv(self):
2693     """Build hooks env.
2694
2695     This runs on master, primary and secondary nodes of the instance.
2696
2697     """
2698     env = {
2699       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2700       "INSTANCE_DISK_SIZE": self.op.disk_size,
2701       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2702       "INSTANCE_ADD_MODE": self.op.mode,
2703       }
2704     if self.op.mode == constants.INSTANCE_IMPORT:
2705       env["INSTANCE_SRC_NODE"] = self.op.src_node
2706       env["INSTANCE_SRC_PATH"] = self.op.src_path
2707       env["INSTANCE_SRC_IMAGE"] = self.src_image
2708
2709     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2710       primary_node=self.op.pnode,
2711       secondary_nodes=self.secondaries,
2712       status=self.instance_status,
2713       os_type=self.op.os_type,
2714       memory=self.op.mem_size,
2715       vcpus=self.op.vcpus,
2716       nics=[(self.inst_ip, self.op.bridge)],
2717     ))
2718
2719     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2720           self.secondaries)
2721     return env, nl, nl
2722
2723
2724   def CheckPrereq(self):
2725     """Check prerequisites.
2726
2727     """
2728     if self.op.mode not in (constants.INSTANCE_CREATE,
2729                             constants.INSTANCE_IMPORT):
2730       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2731                                  self.op.mode)
2732
2733     if self.op.mode == constants.INSTANCE_IMPORT:
2734       src_node = getattr(self.op, "src_node", None)
2735       src_path = getattr(self.op, "src_path", None)
2736       if src_node is None or src_path is None:
2737         raise errors.OpPrereqError("Importing an instance requires source"
2738                                    " node and path options")
2739       src_node_full = self.cfg.ExpandNodeName(src_node)
2740       if src_node_full is None:
2741         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2742       self.op.src_node = src_node = src_node_full
2743
2744       if not os.path.isabs(src_path):
2745         raise errors.OpPrereqError("The source path must be absolute")
2746
2747       export_info = rpc.call_export_info(src_node, src_path)
2748
2749       if not export_info:
2750         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2751
2752       if not export_info.has_section(constants.INISECT_EXP):
2753         raise errors.ProgrammerError("Corrupted export config")
2754
2755       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2756       if (int(ei_version) != constants.EXPORT_VERSION):
2757         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2758                                    (ei_version, constants.EXPORT_VERSION))
2759
2760       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2761         raise errors.OpPrereqError("Can't import instance with more than"
2762                                    " one data disk")
2763
2764       # FIXME: are the old os-es, disk sizes, etc. useful?
2765       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2766       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2767                                                          'disk0_dump'))
2768       self.src_image = diskimage
2769     else: # INSTANCE_CREATE
2770       if getattr(self.op, "os_type", None) is None:
2771         raise errors.OpPrereqError("No guest OS specified")
2772
2773     # check primary node
2774     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2775     if pnode is None:
2776       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2777                                  self.op.pnode)
2778     self.op.pnode = pnode.name
2779     self.pnode = pnode
2780     self.secondaries = []
2781     # disk template and mirror node verification
2782     if self.op.disk_template not in constants.DISK_TEMPLATES:
2783       raise errors.OpPrereqError("Invalid disk template name")
2784
2785     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2786       if getattr(self.op, "snode", None) is None:
2787         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2788                                    " a mirror node")
2789
2790       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2791       if snode_name is None:
2792         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2793                                    self.op.snode)
2794       elif snode_name == pnode.name:
2795         raise errors.OpPrereqError("The secondary node cannot be"
2796                                    " the primary node.")
2797       self.secondaries.append(snode_name)
2798
2799     # Check lv size requirements
2800     nodenames = [pnode.name] + self.secondaries
2801     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2802
2803     # Required free disk space as a function of disk and swap space
2804     req_size_dict = {
2805       constants.DT_DISKLESS: 0,
2806       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2807       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2808       # 256 MB are added for drbd metadata, 128MB for each drbd device
2809       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2810     }
2811
2812     if self.op.disk_template not in req_size_dict:
2813       raise errors.ProgrammerError("Disk template '%s' size requirement"
2814                                    " is unknown" %  self.op.disk_template)
2815
2816     req_size = req_size_dict[self.op.disk_template]
2817
2818     for node in nodenames:
2819       info = nodeinfo.get(node, None)
2820       if not info:
2821         raise errors.OpPrereqError("Cannot get current information"
2822                                    " from node '%s'" % nodeinfo)
2823       if req_size > info['vg_free']:
2824         raise errors.OpPrereqError("Not enough disk space on target node %s."
2825                                    " %d MB available, %d MB required" %
2826                                    (node, info['vg_free'], req_size))
2827
2828     # os verification
2829     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2830     if not isinstance(os_obj, objects.OS):
2831       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2832                                  " primary node"  % self.op.os_type)
2833
2834     # instance verification
2835     hostname1 = utils.HostInfo(self.op.instance_name)
2836
2837     self.op.instance_name = instance_name = hostname1.name
2838     instance_list = self.cfg.GetInstanceList()
2839     if instance_name in instance_list:
2840       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2841                                  instance_name)
2842
2843     ip = getattr(self.op, "ip", None)
2844     if ip is None or ip.lower() == "none":
2845       inst_ip = None
2846     elif ip.lower() == "auto":
2847       inst_ip = hostname1.ip
2848     else:
2849       if not utils.IsValidIP(ip):
2850         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2851                                    " like a valid IP" % ip)
2852       inst_ip = ip
2853     self.inst_ip = inst_ip
2854
2855     if self.op.start and not self.op.ip_check:
2856       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2857                                  " adding an instance in start mode")
2858
2859     if self.op.ip_check:
2860       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2861                        constants.DEFAULT_NODED_PORT):
2862         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2863                                    (hostname1.ip, instance_name))
2864
2865     # bridge verification
2866     bridge = getattr(self.op, "bridge", None)
2867     if bridge is None:
2868       self.op.bridge = self.cfg.GetDefBridge()
2869     else:
2870       self.op.bridge = bridge
2871
2872     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2873       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2874                                  " destination node '%s'" %
2875                                  (self.op.bridge, pnode.name))
2876
2877     if self.op.start:
2878       self.instance_status = 'up'
2879     else:
2880       self.instance_status = 'down'
2881
2882   def Exec(self, feedback_fn):
2883     """Create and add the instance to the cluster.
2884
2885     """
2886     instance = self.op.instance_name
2887     pnode_name = self.pnode.name
2888
2889     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2890     if self.inst_ip is not None:
2891       nic.ip = self.inst_ip
2892
2893     disks = _GenerateDiskTemplate(self.cfg,
2894                                   self.op.disk_template,
2895                                   instance, pnode_name,
2896                                   self.secondaries, self.op.disk_size,
2897                                   self.op.swap_size)
2898
2899     iobj = objects.Instance(name=instance, os=self.op.os_type,
2900                             primary_node=pnode_name,
2901                             memory=self.op.mem_size,
2902                             vcpus=self.op.vcpus,
2903                             nics=[nic], disks=disks,
2904                             disk_template=self.op.disk_template,
2905                             status=self.instance_status,
2906                             )
2907
2908     feedback_fn("* creating instance disks...")
2909     if not _CreateDisks(self.cfg, iobj):
2910       _RemoveDisks(iobj, self.cfg)
2911       raise errors.OpExecError("Device creation failed, reverting...")
2912
2913     feedback_fn("adding instance %s to cluster config" % instance)
2914
2915     self.cfg.AddInstance(iobj)
2916
2917     if self.op.wait_for_sync:
2918       disk_abort = not _WaitForSync(self.cfg, iobj)
2919     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2920       # make sure the disks are not degraded (still sync-ing is ok)
2921       time.sleep(15)
2922       feedback_fn("* checking mirrors status")
2923       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2924     else:
2925       disk_abort = False
2926
2927     if disk_abort:
2928       _RemoveDisks(iobj, self.cfg)
2929       self.cfg.RemoveInstance(iobj.name)
2930       raise errors.OpExecError("There are some degraded disks for"
2931                                " this instance")
2932
2933     feedback_fn("creating os for instance %s on node %s" %
2934                 (instance, pnode_name))
2935
2936     if iobj.disk_template != constants.DT_DISKLESS:
2937       if self.op.mode == constants.INSTANCE_CREATE:
2938         feedback_fn("* running the instance OS create scripts...")
2939         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2940           raise errors.OpExecError("could not add os for instance %s"
2941                                    " on node %s" %
2942                                    (instance, pnode_name))
2943
2944       elif self.op.mode == constants.INSTANCE_IMPORT:
2945         feedback_fn("* running the instance OS import scripts...")
2946         src_node = self.op.src_node
2947         src_image = self.src_image
2948         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2949                                                 src_node, src_image):
2950           raise errors.OpExecError("Could not import os for instance"
2951                                    " %s on node %s" %
2952                                    (instance, pnode_name))
2953       else:
2954         # also checked in the prereq part
2955         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2956                                      % self.op.mode)
2957
2958     if self.op.start:
2959       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2960       feedback_fn("* starting instance...")
2961       if not rpc.call_instance_start(pnode_name, iobj, None):
2962         raise errors.OpExecError("Could not start instance")
2963
2964
2965 class LUConnectConsole(NoHooksLU):
2966   """Connect to an instance's console.
2967
2968   This is somewhat special in that it returns the command line that
2969   you need to run on the master node in order to connect to the
2970   console.
2971
2972   """
2973   _OP_REQP = ["instance_name"]
2974
2975   def CheckPrereq(self):
2976     """Check prerequisites.
2977
2978     This checks that the instance is in the cluster.
2979
2980     """
2981     instance = self.cfg.GetInstanceInfo(
2982       self.cfg.ExpandInstanceName(self.op.instance_name))
2983     if instance is None:
2984       raise errors.OpPrereqError("Instance '%s' not known" %
2985                                  self.op.instance_name)
2986     self.instance = instance
2987
2988   def Exec(self, feedback_fn):
2989     """Connect to the console of an instance
2990
2991     """
2992     instance = self.instance
2993     node = instance.primary_node
2994
2995     node_insts = rpc.call_instance_list([node])[node]
2996     if node_insts is False:
2997       raise errors.OpExecError("Can't connect to node %s." % node)
2998
2999     if instance.name not in node_insts:
3000       raise errors.OpExecError("Instance %s is not running." % instance.name)
3001
3002     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3003
3004     hyper = hypervisor.GetHypervisor()
3005     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3006     # build ssh cmdline
3007     argv = ["ssh", "-q", "-t"]
3008     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3009     argv.extend(ssh.BATCH_MODE_OPTS)
3010     argv.append(node)
3011     argv.append(console_cmd)
3012     return "ssh", argv
3013
3014
3015 class LUAddMDDRBDComponent(LogicalUnit):
3016   """Adda new mirror member to an instance's disk.
3017
3018   """
3019   HPATH = "mirror-add"
3020   HTYPE = constants.HTYPE_INSTANCE
3021   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3022
3023   def BuildHooksEnv(self):
3024     """Build hooks env.
3025
3026     This runs on the master, the primary and all the secondaries.
3027
3028     """
3029     env = {
3030       "NEW_SECONDARY": self.op.remote_node,
3031       "DISK_NAME": self.op.disk_name,
3032       }
3033     env.update(_BuildInstanceHookEnvByObject(self.instance))
3034     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3035           self.op.remote_node,] + list(self.instance.secondary_nodes)
3036     return env, nl, nl
3037
3038   def CheckPrereq(self):
3039     """Check prerequisites.
3040
3041     This checks that the instance is in the cluster.
3042
3043     """
3044     instance = self.cfg.GetInstanceInfo(
3045       self.cfg.ExpandInstanceName(self.op.instance_name))
3046     if instance is None:
3047       raise errors.OpPrereqError("Instance '%s' not known" %
3048                                  self.op.instance_name)
3049     self.instance = instance
3050
3051     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3052     if remote_node is None:
3053       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3054     self.remote_node = remote_node
3055
3056     if remote_node == instance.primary_node:
3057       raise errors.OpPrereqError("The specified node is the primary node of"
3058                                  " the instance.")
3059
3060     if instance.disk_template != constants.DT_REMOTE_RAID1:
3061       raise errors.OpPrereqError("Instance's disk layout is not"
3062                                  " remote_raid1.")
3063     for disk in instance.disks:
3064       if disk.iv_name == self.op.disk_name:
3065         break
3066     else:
3067       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3068                                  " instance." % self.op.disk_name)
3069     if len(disk.children) > 1:
3070       raise errors.OpPrereqError("The device already has two slave"
3071                                  " devices.\n"
3072                                  "This would create a 3-disk raid1"
3073                                  " which we don't allow.")
3074     self.disk = disk
3075
3076   def Exec(self, feedback_fn):
3077     """Add the mirror component
3078
3079     """
3080     disk = self.disk
3081     instance = self.instance
3082
3083     remote_node = self.remote_node
3084     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3085     names = _GenerateUniqueNames(self.cfg, lv_names)
3086     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3087                                      remote_node, disk.size, names)
3088
3089     logger.Info("adding new mirror component on secondary")
3090     #HARDCODE
3091     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3092                                       _GetInstanceInfoText(instance)):
3093       raise errors.OpExecError("Failed to create new component on secondary"
3094                                " node %s" % remote_node)
3095
3096     logger.Info("adding new mirror component on primary")
3097     #HARDCODE
3098     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3099                                     _GetInstanceInfoText(instance)):
3100       # remove secondary dev
3101       self.cfg.SetDiskID(new_drbd, remote_node)
3102       rpc.call_blockdev_remove(remote_node, new_drbd)
3103       raise errors.OpExecError("Failed to create volume on primary")
3104
3105     # the device exists now
3106     # call the primary node to add the mirror to md
3107     logger.Info("adding new mirror component to md")
3108     if not rpc.call_blockdev_addchild(instance.primary_node,
3109                                            disk, new_drbd):
3110       logger.Error("Can't add mirror compoment to md!")
3111       self.cfg.SetDiskID(new_drbd, remote_node)
3112       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3113         logger.Error("Can't rollback on secondary")
3114       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3115       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3116         logger.Error("Can't rollback on primary")
3117       raise errors.OpExecError("Can't add mirror component to md array")
3118
3119     disk.children.append(new_drbd)
3120
3121     self.cfg.AddInstance(instance)
3122
3123     _WaitForSync(self.cfg, instance)
3124
3125     return 0
3126
3127
3128 class LURemoveMDDRBDComponent(LogicalUnit):
3129   """Remove a component from a remote_raid1 disk.
3130
3131   """
3132   HPATH = "mirror-remove"
3133   HTYPE = constants.HTYPE_INSTANCE
3134   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3135
3136   def BuildHooksEnv(self):
3137     """Build hooks env.
3138
3139     This runs on the master, the primary and all the secondaries.
3140
3141     """
3142     env = {
3143       "DISK_NAME": self.op.disk_name,
3144       "DISK_ID": self.op.disk_id,
3145       "OLD_SECONDARY": self.old_secondary,
3146       }
3147     env.update(_BuildInstanceHookEnvByObject(self.instance))
3148     nl = [self.sstore.GetMasterNode(),
3149           self.instance.primary_node] + list(self.instance.secondary_nodes)
3150     return env, nl, nl
3151
3152   def CheckPrereq(self):
3153     """Check prerequisites.
3154
3155     This checks that the instance is in the cluster.
3156
3157     """
3158     instance = self.cfg.GetInstanceInfo(
3159       self.cfg.ExpandInstanceName(self.op.instance_name))
3160     if instance is None:
3161       raise errors.OpPrereqError("Instance '%s' not known" %
3162                                  self.op.instance_name)
3163     self.instance = instance
3164
3165     if instance.disk_template != constants.DT_REMOTE_RAID1:
3166       raise errors.OpPrereqError("Instance's disk layout is not"
3167                                  " remote_raid1.")
3168     for disk in instance.disks:
3169       if disk.iv_name == self.op.disk_name:
3170         break
3171     else:
3172       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3173                                  " instance." % self.op.disk_name)
3174     for child in disk.children:
3175       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3176         break
3177     else:
3178       raise errors.OpPrereqError("Can't find the device with this port.")
3179
3180     if len(disk.children) < 2:
3181       raise errors.OpPrereqError("Cannot remove the last component from"
3182                                  " a mirror.")
3183     self.disk = disk
3184     self.child = child
3185     if self.child.logical_id[0] == instance.primary_node:
3186       oid = 1
3187     else:
3188       oid = 0
3189     self.old_secondary = self.child.logical_id[oid]
3190
3191   def Exec(self, feedback_fn):
3192     """Remove the mirror component
3193
3194     """
3195     instance = self.instance
3196     disk = self.disk
3197     child = self.child
3198     logger.Info("remove mirror component")
3199     self.cfg.SetDiskID(disk, instance.primary_node)
3200     if not rpc.call_blockdev_removechild(instance.primary_node,
3201                                               disk, child):
3202       raise errors.OpExecError("Can't remove child from mirror.")
3203
3204     for node in child.logical_id[:2]:
3205       self.cfg.SetDiskID(child, node)
3206       if not rpc.call_blockdev_remove(node, child):
3207         logger.Error("Warning: failed to remove device from node %s,"
3208                      " continuing operation." % node)
3209
3210     disk.children.remove(child)
3211     self.cfg.AddInstance(instance)
3212
3213
3214 class LUReplaceDisks(LogicalUnit):
3215   """Replace the disks of an instance.
3216
3217   """
3218   HPATH = "mirrors-replace"
3219   HTYPE = constants.HTYPE_INSTANCE
3220   _OP_REQP = ["instance_name"]
3221
3222   def BuildHooksEnv(self):
3223     """Build hooks env.
3224
3225     This runs on the master, the primary and all the secondaries.
3226
3227     """
3228     env = {
3229       "NEW_SECONDARY": self.op.remote_node,
3230       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3231       }
3232     env.update(_BuildInstanceHookEnvByObject(self.instance))
3233     nl = [self.sstore.GetMasterNode(),
3234           self.instance.primary_node] + list(self.instance.secondary_nodes)
3235     return env, nl, nl
3236
3237   def CheckPrereq(self):
3238     """Check prerequisites.
3239
3240     This checks that the instance is in the cluster.
3241
3242     """
3243     instance = self.cfg.GetInstanceInfo(
3244       self.cfg.ExpandInstanceName(self.op.instance_name))
3245     if instance is None:
3246       raise errors.OpPrereqError("Instance '%s' not known" %
3247                                  self.op.instance_name)
3248     self.instance = instance
3249
3250     if instance.disk_template != constants.DT_REMOTE_RAID1:
3251       raise errors.OpPrereqError("Instance's disk layout is not"
3252                                  " remote_raid1.")
3253
3254     if len(instance.secondary_nodes) != 1:
3255       raise errors.OpPrereqError("The instance has a strange layout,"
3256                                  " expected one secondary but found %d" %
3257                                  len(instance.secondary_nodes))
3258
3259     remote_node = getattr(self.op, "remote_node", None)
3260     if remote_node is None:
3261       remote_node = instance.secondary_nodes[0]
3262     else:
3263       remote_node = self.cfg.ExpandNodeName(remote_node)
3264       if remote_node is None:
3265         raise errors.OpPrereqError("Node '%s' not known" %
3266                                    self.op.remote_node)
3267     if remote_node == instance.primary_node:
3268       raise errors.OpPrereqError("The specified node is the primary node of"
3269                                  " the instance.")
3270     self.op.remote_node = remote_node
3271
3272   def Exec(self, feedback_fn):
3273     """Replace the disks of an instance.
3274
3275     """
3276     instance = self.instance
3277     iv_names = {}
3278     # start of work
3279     remote_node = self.op.remote_node
3280     cfg = self.cfg
3281     for dev in instance.disks:
3282       size = dev.size
3283       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3284       names = _GenerateUniqueNames(cfg, lv_names)
3285       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3286                                        remote_node, size, names)
3287       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3288       logger.Info("adding new mirror component on secondary for %s" %
3289                   dev.iv_name)
3290       #HARDCODE
3291       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3292                                         _GetInstanceInfoText(instance)):
3293         raise errors.OpExecError("Failed to create new component on"
3294                                  " secondary node %s\n"
3295                                  "Full abort, cleanup manually!" %
3296                                  remote_node)
3297
3298       logger.Info("adding new mirror component on primary")
3299       #HARDCODE
3300       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3301                                       _GetInstanceInfoText(instance)):
3302         # remove secondary dev
3303         cfg.SetDiskID(new_drbd, remote_node)
3304         rpc.call_blockdev_remove(remote_node, new_drbd)
3305         raise errors.OpExecError("Failed to create volume on primary!\n"
3306                                  "Full abort, cleanup manually!!")
3307
3308       # the device exists now
3309       # call the primary node to add the mirror to md
3310       logger.Info("adding new mirror component to md")
3311       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3312                                         new_drbd):
3313         logger.Error("Can't add mirror compoment to md!")
3314         cfg.SetDiskID(new_drbd, remote_node)
3315         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3316           logger.Error("Can't rollback on secondary")
3317         cfg.SetDiskID(new_drbd, instance.primary_node)
3318         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3319           logger.Error("Can't rollback on primary")
3320         raise errors.OpExecError("Full abort, cleanup manually!!")
3321
3322       dev.children.append(new_drbd)
3323       cfg.AddInstance(instance)
3324
3325     # this can fail as the old devices are degraded and _WaitForSync
3326     # does a combined result over all disks, so we don't check its
3327     # return value
3328     _WaitForSync(cfg, instance, unlock=True)
3329
3330     # so check manually all the devices
3331     for name in iv_names:
3332       dev, child, new_drbd = iv_names[name]
3333       cfg.SetDiskID(dev, instance.primary_node)
3334       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3335       if is_degr:
3336         raise errors.OpExecError("MD device %s is degraded!" % name)
3337       cfg.SetDiskID(new_drbd, instance.primary_node)
3338       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3339       if is_degr:
3340         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3341
3342     for name in iv_names:
3343       dev, child, new_drbd = iv_names[name]
3344       logger.Info("remove mirror %s component" % name)
3345       cfg.SetDiskID(dev, instance.primary_node)
3346       if not rpc.call_blockdev_removechild(instance.primary_node,
3347                                                 dev, child):
3348         logger.Error("Can't remove child from mirror, aborting"
3349                      " *this device cleanup*.\nYou need to cleanup manually!!")
3350         continue
3351
3352       for node in child.logical_id[:2]:
3353         logger.Info("remove child device on %s" % node)
3354         cfg.SetDiskID(child, node)
3355         if not rpc.call_blockdev_remove(node, child):
3356           logger.Error("Warning: failed to remove device from node %s,"
3357                        " continuing operation." % node)
3358
3359       dev.children.remove(child)
3360
3361       cfg.AddInstance(instance)
3362
3363
3364 class LUQueryInstanceData(NoHooksLU):
3365   """Query runtime instance data.
3366
3367   """
3368   _OP_REQP = ["instances"]
3369
3370   def CheckPrereq(self):
3371     """Check prerequisites.
3372
3373     This only checks the optional instance list against the existing names.
3374
3375     """
3376     if not isinstance(self.op.instances, list):
3377       raise errors.OpPrereqError("Invalid argument type 'instances'")
3378     if self.op.instances:
3379       self.wanted_instances = []
3380       names = self.op.instances
3381       for name in names:
3382         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3383         if instance is None:
3384           raise errors.OpPrereqError("No such instance name '%s'" % name)
3385       self.wanted_instances.append(instance)
3386     else:
3387       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3388                                in self.cfg.GetInstanceList()]
3389     return
3390
3391
3392   def _ComputeDiskStatus(self, instance, snode, dev):
3393     """Compute block device status.
3394
3395     """
3396     self.cfg.SetDiskID(dev, instance.primary_node)
3397     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3398     if dev.dev_type == "drbd":
3399       # we change the snode then (otherwise we use the one passed in)
3400       if dev.logical_id[0] == instance.primary_node:
3401         snode = dev.logical_id[1]
3402       else:
3403         snode = dev.logical_id[0]
3404
3405     if snode:
3406       self.cfg.SetDiskID(dev, snode)
3407       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3408     else:
3409       dev_sstatus = None
3410
3411     if dev.children:
3412       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3413                       for child in dev.children]
3414     else:
3415       dev_children = []
3416
3417     data = {
3418       "iv_name": dev.iv_name,
3419       "dev_type": dev.dev_type,
3420       "logical_id": dev.logical_id,
3421       "physical_id": dev.physical_id,
3422       "pstatus": dev_pstatus,
3423       "sstatus": dev_sstatus,
3424       "children": dev_children,
3425       }
3426
3427     return data
3428
3429   def Exec(self, feedback_fn):
3430     """Gather and return data"""
3431     result = {}
3432     for instance in self.wanted_instances:
3433       remote_info = rpc.call_instance_info(instance.primary_node,
3434                                                 instance.name)
3435       if remote_info and "state" in remote_info:
3436         remote_state = "up"
3437       else:
3438         remote_state = "down"
3439       if instance.status == "down":
3440         config_state = "down"
3441       else:
3442         config_state = "up"
3443
3444       disks = [self._ComputeDiskStatus(instance, None, device)
3445                for device in instance.disks]
3446
3447       idict = {
3448         "name": instance.name,
3449         "config_state": config_state,
3450         "run_state": remote_state,
3451         "pnode": instance.primary_node,
3452         "snodes": instance.secondary_nodes,
3453         "os": instance.os,
3454         "memory": instance.memory,
3455         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3456         "disks": disks,
3457         }
3458
3459       result[instance.name] = idict
3460
3461     return result
3462
3463
3464 class LUSetInstanceParms(LogicalUnit):
3465   """Modifies an instances's parameters.
3466
3467   """
3468   HPATH = "instance-modify"
3469   HTYPE = constants.HTYPE_INSTANCE
3470   _OP_REQP = ["instance_name"]
3471
3472   def BuildHooksEnv(self):
3473     """Build hooks env.
3474
3475     This runs on the master, primary and secondaries.
3476
3477     """
3478     args = dict()
3479     if self.mem:
3480       args['memory'] = self.mem
3481     if self.vcpus:
3482       args['vcpus'] = self.vcpus
3483     if self.do_ip or self.do_bridge:
3484       if self.do_ip:
3485         ip = self.ip
3486       else:
3487         ip = self.instance.nics[0].ip
3488       if self.bridge:
3489         bridge = self.bridge
3490       else:
3491         bridge = self.instance.nics[0].bridge
3492       args['nics'] = [(ip, bridge)]
3493     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3494     nl = [self.sstore.GetMasterNode(),
3495           self.instance.primary_node] + list(self.instance.secondary_nodes)
3496     return env, nl, nl
3497
3498   def CheckPrereq(self):
3499     """Check prerequisites.
3500
3501     This only checks the instance list against the existing names.
3502
3503     """
3504     self.mem = getattr(self.op, "mem", None)
3505     self.vcpus = getattr(self.op, "vcpus", None)
3506     self.ip = getattr(self.op, "ip", None)
3507     self.bridge = getattr(self.op, "bridge", None)
3508     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3509       raise errors.OpPrereqError("No changes submitted")
3510     if self.mem is not None:
3511       try:
3512         self.mem = int(self.mem)
3513       except ValueError, err:
3514         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3515     if self.vcpus is not None:
3516       try:
3517         self.vcpus = int(self.vcpus)
3518       except ValueError, err:
3519         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3520     if self.ip is not None:
3521       self.do_ip = True
3522       if self.ip.lower() == "none":
3523         self.ip = None
3524       else:
3525         if not utils.IsValidIP(self.ip):
3526           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3527     else:
3528       self.do_ip = False
3529     self.do_bridge = (self.bridge is not None)
3530
3531     instance = self.cfg.GetInstanceInfo(
3532       self.cfg.ExpandInstanceName(self.op.instance_name))
3533     if instance is None:
3534       raise errors.OpPrereqError("No such instance name '%s'" %
3535                                  self.op.instance_name)
3536     self.op.instance_name = instance.name
3537     self.instance = instance
3538     return
3539
3540   def Exec(self, feedback_fn):
3541     """Modifies an instance.
3542
3543     All parameters take effect only at the next restart of the instance.
3544     """
3545     result = []
3546     instance = self.instance
3547     if self.mem:
3548       instance.memory = self.mem
3549       result.append(("mem", self.mem))
3550     if self.vcpus:
3551       instance.vcpus = self.vcpus
3552       result.append(("vcpus",  self.vcpus))
3553     if self.do_ip:
3554       instance.nics[0].ip = self.ip
3555       result.append(("ip", self.ip))
3556     if self.bridge:
3557       instance.nics[0].bridge = self.bridge
3558       result.append(("bridge", self.bridge))
3559
3560     self.cfg.AddInstance(instance)
3561
3562     return result
3563
3564
3565 class LUQueryExports(NoHooksLU):
3566   """Query the exports list
3567
3568   """
3569   _OP_REQP = []
3570
3571   def CheckPrereq(self):
3572     """Check that the nodelist contains only existing nodes.
3573
3574     """
3575     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3576
3577   def Exec(self, feedback_fn):
3578     """Compute the list of all the exported system images.
3579
3580     Returns:
3581       a dictionary with the structure node->(export-list)
3582       where export-list is a list of the instances exported on
3583       that node.
3584
3585     """
3586     return rpc.call_export_list(self.nodes)
3587
3588
3589 class LUExportInstance(LogicalUnit):
3590   """Export an instance to an image in the cluster.
3591
3592   """
3593   HPATH = "instance-export"
3594   HTYPE = constants.HTYPE_INSTANCE
3595   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3596
3597   def BuildHooksEnv(self):
3598     """Build hooks env.
3599
3600     This will run on the master, primary node and target node.
3601
3602     """
3603     env = {
3604       "EXPORT_NODE": self.op.target_node,
3605       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3606       }
3607     env.update(_BuildInstanceHookEnvByObject(self.instance))
3608     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3609           self.op.target_node]
3610     return env, nl, nl
3611
3612   def CheckPrereq(self):
3613     """Check prerequisites.
3614
3615     This checks that the instance name is a valid one.
3616
3617     """
3618     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3619     self.instance = self.cfg.GetInstanceInfo(instance_name)
3620     if self.instance is None:
3621       raise errors.OpPrereqError("Instance '%s' not found" %
3622                                  self.op.instance_name)
3623
3624     # node verification
3625     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3626     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3627
3628     if self.dst_node is None:
3629       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3630                                  self.op.target_node)
3631     self.op.target_node = self.dst_node.name
3632
3633   def Exec(self, feedback_fn):
3634     """Export an instance to an image in the cluster.
3635
3636     """
3637     instance = self.instance
3638     dst_node = self.dst_node
3639     src_node = instance.primary_node
3640     # shutdown the instance, unless requested not to do so
3641     if self.op.shutdown:
3642       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3643       self.processor.ChainOpCode(op, feedback_fn)
3644
3645     vgname = self.cfg.GetVGName()
3646
3647     snap_disks = []
3648
3649     try:
3650       for disk in instance.disks:
3651         if disk.iv_name == "sda":
3652           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3653           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3654
3655           if not new_dev_name:
3656             logger.Error("could not snapshot block device %s on node %s" %
3657                          (disk.logical_id[1], src_node))
3658           else:
3659             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3660                                       logical_id=(vgname, new_dev_name),
3661                                       physical_id=(vgname, new_dev_name),
3662                                       iv_name=disk.iv_name)
3663             snap_disks.append(new_dev)
3664
3665     finally:
3666       if self.op.shutdown:
3667         op = opcodes.OpStartupInstance(instance_name=instance.name,
3668                                        force=False)
3669         self.processor.ChainOpCode(op, feedback_fn)
3670
3671     # TODO: check for size
3672
3673     for dev in snap_disks:
3674       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3675                                            instance):
3676         logger.Error("could not export block device %s from node"
3677                      " %s to node %s" %
3678                      (dev.logical_id[1], src_node, dst_node.name))
3679       if not rpc.call_blockdev_remove(src_node, dev):
3680         logger.Error("could not remove snapshot block device %s from"
3681                      " node %s" % (dev.logical_id[1], src_node))
3682
3683     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3684       logger.Error("could not finalize export for instance %s on node %s" %
3685                    (instance.name, dst_node.name))
3686
3687     nodelist = self.cfg.GetNodeList()
3688     nodelist.remove(dst_node.name)
3689
3690     # on one-node clusters nodelist will be empty after the removal
3691     # if we proceed the backup would be removed because OpQueryExports
3692     # substitutes an empty list with the full cluster node list.
3693     if nodelist:
3694       op = opcodes.OpQueryExports(nodes=nodelist)
3695       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3696       for node in exportlist:
3697         if instance.name in exportlist[node]:
3698           if not rpc.call_export_remove(node, instance.name):
3699             logger.Error("could not remove older export for instance %s"
3700                          " on node %s" % (instance.name, node))
3701
3702
3703 class TagsLU(NoHooksLU):
3704   """Generic tags LU.
3705
3706   This is an abstract class which is the parent of all the other tags LUs.
3707
3708   """
3709   def CheckPrereq(self):
3710     """Check prerequisites.
3711
3712     """
3713     if self.op.kind == constants.TAG_CLUSTER:
3714       self.target = self.cfg.GetClusterInfo()
3715     elif self.op.kind == constants.TAG_NODE:
3716       name = self.cfg.ExpandNodeName(self.op.name)
3717       if name is None:
3718         raise errors.OpPrereqError("Invalid node name (%s)" %
3719                                    (self.op.name,))
3720       self.op.name = name
3721       self.target = self.cfg.GetNodeInfo(name)
3722     elif self.op.kind == constants.TAG_INSTANCE:
3723       name = self.cfg.ExpandInstanceName(self.op.name)
3724       if name is None:
3725         raise errors.OpPrereqError("Invalid instance name (%s)" %
3726                                    (self.op.name,))
3727       self.op.name = name
3728       self.target = self.cfg.GetInstanceInfo(name)
3729     else:
3730       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3731                                  str(self.op.kind))
3732
3733
3734 class LUGetTags(TagsLU):
3735   """Returns the tags of a given object.
3736
3737   """
3738   _OP_REQP = ["kind", "name"]
3739
3740   def Exec(self, feedback_fn):
3741     """Returns the tag list.
3742
3743     """
3744     return self.target.GetTags()
3745
3746
3747 class LUAddTags(TagsLU):
3748   """Sets a tag on a given object.
3749
3750   """
3751   _OP_REQP = ["kind", "name", "tags"]
3752
3753   def CheckPrereq(self):
3754     """Check prerequisites.
3755
3756     This checks the type and length of the tag name and value.
3757
3758     """
3759     TagsLU.CheckPrereq(self)
3760     for tag in self.op.tags:
3761       objects.TaggableObject.ValidateTag(tag)
3762
3763   def Exec(self, feedback_fn):
3764     """Sets the tag.
3765
3766     """
3767     try:
3768       for tag in self.op.tags:
3769         self.target.AddTag(tag)
3770     except errors.TagError, err:
3771       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3772     try:
3773       self.cfg.Update(self.target)
3774     except errors.ConfigurationError:
3775       raise errors.OpRetryError("There has been a modification to the"
3776                                 " config file and the operation has been"
3777                                 " aborted. Please retry.")
3778
3779
3780 class LUDelTags(TagsLU):
3781   """Delete a list of tags from a given object.
3782
3783   """
3784   _OP_REQP = ["kind", "name", "tags"]
3785
3786   def CheckPrereq(self):
3787     """Check prerequisites.
3788
3789     This checks that we have the given tag.
3790
3791     """
3792     TagsLU.CheckPrereq(self)
3793     for tag in self.op.tags:
3794       objects.TaggableObject.ValidateTag(tag)
3795     del_tags = frozenset(self.op.tags)
3796     cur_tags = self.target.GetTags()
3797     if not del_tags <= cur_tags:
3798       diff_tags = del_tags - cur_tags
3799       diff_names = ["'%s'" % tag for tag in diff_tags]
3800       diff_names.sort()
3801       raise errors.OpPrereqError("Tag(s) %s not found" %
3802                                  (",".join(diff_names)))
3803
3804   def Exec(self, feedback_fn):
3805     """Remove the tag from the object.
3806
3807     """
3808     for tag in self.op.tags:
3809       self.target.RemoveTag(tag)
3810     try:
3811       self.cfg.Update(self.target)
3812     except errors.ConfigurationError:
3813       raise errors.OpRetryError("There has been a modification to the"
3814                                 " config file and the operation has been"
3815                                 " aborted. Please retry.")