Replace more ssh paths with proper constants
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import socket
30 import time
31 import tempfile
32 import re
33 import platform
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.processor = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     for attr_name in self._OP_REQP:
78       attr_val = getattr(op, attr_name, None)
79       if attr_val is None:
80         raise errors.OpPrereqError("Required parameter '%s' missing" %
81                                    attr_name)
82     if self.REQ_CLUSTER:
83       if not cfg.IsCluster():
84         raise errors.OpPrereqError("Cluster not initialized yet,"
85                                    " use 'gnt-cluster init' first.")
86       if self.REQ_MASTER:
87         master = sstore.GetMasterNode()
88         if master != utils.HostInfo().name:
89           raise errors.OpPrereqError("Commands must be run on the master"
90                                      " node %s" % master)
91
92   def CheckPrereq(self):
93     """Check prerequisites for this LU.
94
95     This method should check that the prerequisites for the execution
96     of this LU are fulfilled. It can do internode communication, but
97     it should be idempotent - no cluster or system changes are
98     allowed.
99
100     The method should raise errors.OpPrereqError in case something is
101     not fulfilled. Its return value is ignored.
102
103     This method should also update all the parameters of the opcode to
104     their canonical form; e.g. a short node name must be fully
105     expanded after this method has successfully completed (so that
106     hooks, logging, etc. work correctly).
107
108     """
109     raise NotImplementedError
110
111   def Exec(self, feedback_fn):
112     """Execute the LU.
113
114     This method should implement the actual work. It should raise
115     errors.OpExecError for failures that are somewhat dealt with in
116     code, or expected.
117
118     """
119     raise NotImplementedError
120
121   def BuildHooksEnv(self):
122     """Build hooks environment for this LU.
123
124     This method should return a three-node tuple consisting of: a dict
125     containing the environment that will be used for running the
126     specific hook for this LU, a list of node names on which the hook
127     should run before the execution, and a list of node names on which
128     the hook should run after the execution.
129
130     The keys of the dict must not have 'GANETI_' prefixed as this will
131     be handled in the hooks runner. Also note additional keys will be
132     added by the hooks runner. If the LU doesn't define any
133     environment, an empty dict (and not None) should be returned.
134
135     As for the node lists, the master should not be included in the
136     them, as it will be added by the hooks runner in case this LU
137     requires a cluster to run on (otherwise we don't have a node
138     list). No nodes should be returned as an empty list (and not
139     None).
140
141     Note that if the HPATH for a LU class is None, this function will
142     not be called.
143
144     """
145     raise NotImplementedError
146
147
148 class NoHooksLU(LogicalUnit):
149   """Simple LU which runs no hooks.
150
151   This LU is intended as a parent for other LogicalUnits which will
152   run no hooks, in order to reduce duplicate code.
153
154   """
155   HPATH = None
156   HTYPE = None
157
158   def BuildHooksEnv(self):
159     """Build hooks env.
160
161     This is a no-op, since we don't run hooks.
162
163     """
164     return {}, [], []
165
166
167 def _GetWantedNodes(lu, nodes):
168   """Returns list of checked and expanded node names.
169
170   Args:
171     nodes: List of nodes (strings) or None for all
172
173   """
174   if not isinstance(nodes, list):
175     raise errors.OpPrereqError("Invalid argument type 'nodes'")
176
177   if nodes:
178     wanted = []
179
180     for name in nodes:
181       node = lu.cfg.ExpandNodeName(name)
182       if node is None:
183         raise errors.OpPrereqError("No such node name '%s'" % name)
184       wanted.append(node)
185
186   else:
187     wanted = lu.cfg.GetNodeList()
188   return utils.NiceSort(wanted)
189
190
191 def _GetWantedInstances(lu, instances):
192   """Returns list of checked and expanded instance names.
193
194   Args:
195     instances: List of instances (strings) or None for all
196
197   """
198   if not isinstance(instances, list):
199     raise errors.OpPrereqError("Invalid argument type 'instances'")
200
201   if instances:
202     wanted = []
203
204     for name in instances:
205       instance = lu.cfg.ExpandInstanceName(name)
206       if instance is None:
207         raise errors.OpPrereqError("No such instance name '%s'" % name)
208       wanted.append(instance)
209
210   else:
211     wanted = lu.cfg.GetInstanceList()
212   return utils.NiceSort(wanted)
213
214
215 def _CheckOutputFields(static, dynamic, selected):
216   """Checks whether all selected fields are valid.
217
218   Args:
219     static: Static fields
220     dynamic: Dynamic fields
221
222   """
223   static_fields = frozenset(static)
224   dynamic_fields = frozenset(dynamic)
225
226   all_fields = static_fields | dynamic_fields
227
228   if not all_fields.issuperset(selected):
229     raise errors.OpPrereqError("Unknown output fields selected: %s"
230                                % ",".join(frozenset(selected).
231                                           difference(all_fields)))
232
233
234 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
235                           memory, vcpus, nics):
236   """Builds instance related env variables for hooks from single variables.
237
238   Args:
239     secondary_nodes: List of secondary nodes as strings
240   """
241   env = {
242     "OP_TARGET": name,
243     "INSTANCE_NAME": name,
244     "INSTANCE_PRIMARY": primary_node,
245     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
246     "INSTANCE_OS_TYPE": os_type,
247     "INSTANCE_STATUS": status,
248     "INSTANCE_MEMORY": memory,
249     "INSTANCE_VCPUS": vcpus,
250   }
251
252   if nics:
253     nic_count = len(nics)
254     for idx, (ip, bridge) in enumerate(nics):
255       if ip is None:
256         ip = ""
257       env["INSTANCE_NIC%d_IP" % idx] = ip
258       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
259   else:
260     nic_count = 0
261
262   env["INSTANCE_NIC_COUNT"] = nic_count
263
264   return env
265
266
267 def _BuildInstanceHookEnvByObject(instance, override=None):
268   """Builds instance related env variables for hooks from an object.
269
270   Args:
271     instance: objects.Instance object of instance
272     override: dict of values to override
273   """
274   args = {
275     'name': instance.name,
276     'primary_node': instance.primary_node,
277     'secondary_nodes': instance.secondary_nodes,
278     'os_type': instance.os,
279     'status': instance.os,
280     'memory': instance.memory,
281     'vcpus': instance.vcpus,
282     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
283   }
284   if override:
285     args.update(override)
286   return _BuildInstanceHookEnv(**args)
287
288
289 def _UpdateEtcHosts(fullnode, ip):
290   """Ensure a node has a correct entry in /etc/hosts.
291
292   Args:
293     fullnode - Fully qualified domain name of host. (str)
294     ip       - IPv4 address of host (str)
295
296   """
297   node = fullnode.split(".", 1)[0]
298
299   f = open('/etc/hosts', 'r+')
300
301   inthere = False
302
303   save_lines = []
304   add_lines = []
305   removed = False
306
307   while True:
308     rawline = f.readline()
309
310     if not rawline:
311       # End of file
312       break
313
314     line = rawline.split('\n')[0]
315
316     # Strip off comments
317     line = line.split('#')[0]
318
319     if not line:
320       # Entire line was comment, skip
321       save_lines.append(rawline)
322       continue
323
324     fields = line.split()
325
326     haveall = True
327     havesome = False
328     for spec in [ ip, fullnode, node ]:
329       if spec not in fields:
330         haveall = False
331       if spec in fields:
332         havesome = True
333
334     if haveall:
335       inthere = True
336       save_lines.append(rawline)
337       continue
338
339     if havesome and not haveall:
340       # Line (old, or manual?) which is missing some.  Remove.
341       removed = True
342       continue
343
344     save_lines.append(rawline)
345
346   if not inthere:
347     add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
348
349   if removed:
350     if add_lines:
351       save_lines = save_lines + add_lines
352
353     # We removed a line, write a new file and replace old.
354     fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
355     newfile = os.fdopen(fd, 'w')
356     newfile.write(''.join(save_lines))
357     newfile.close()
358     os.rename(tmpname, '/etc/hosts')
359
360   elif add_lines:
361     # Simply appending a new line will do the trick.
362     f.seek(0, 2)
363     for add in add_lines:
364       f.write(add)
365
366   f.close()
367
368
369 def _UpdateKnownHosts(fullnode, ip, pubkey):
370   """Ensure a node has a correct known_hosts entry.
371
372   Args:
373     fullnode - Fully qualified domain name of host. (str)
374     ip       - IPv4 address of host (str)
375     pubkey   - the public key of the cluster
376
377   """
378   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
379     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
380   else:
381     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
382
383   inthere = False
384
385   save_lines = []
386   add_lines = []
387   removed = False
388
389   while True:
390     rawline = f.readline()
391     logger.Debug('read %s' % (repr(rawline),))
392
393     if not rawline:
394       # End of file
395       break
396
397     line = rawline.split('\n')[0]
398
399     parts = line.split(' ')
400     fields = parts[0].split(',')
401     key = parts[2]
402
403     haveall = True
404     havesome = False
405     for spec in [ ip, fullnode ]:
406       if spec not in fields:
407         haveall = False
408       if spec in fields:
409         havesome = True
410
411     logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
412     if haveall and key == pubkey:
413       inthere = True
414       save_lines.append(rawline)
415       logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
416       continue
417
418     if havesome and (not haveall or key != pubkey):
419       removed = True
420       logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
421       continue
422
423     save_lines.append(rawline)
424
425   if not inthere:
426     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
427     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
428
429   if removed:
430     save_lines = save_lines + add_lines
431
432     # Write a new file and replace old.
433     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
434                                    constants.DATA_DIR)
435     newfile = os.fdopen(fd, 'w')
436     try:
437       newfile.write(''.join(save_lines))
438     finally:
439       newfile.close()
440     logger.Debug("Wrote new known_hosts.")
441     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
442
443   elif add_lines:
444     # Simply appending a new line will do the trick.
445     f.seek(0, 2)
446     for add in add_lines:
447       f.write(add)
448
449   f.close()
450
451
452 def _HasValidVG(vglist, vgname):
453   """Checks if the volume group list is valid.
454
455   A non-None return value means there's an error, and the return value
456   is the error message.
457
458   """
459   vgsize = vglist.get(vgname, None)
460   if vgsize is None:
461     return "volume group '%s' missing" % vgname
462   elif vgsize < 20480:
463     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
464             (vgname, vgsize))
465   return None
466
467
468 def _InitSSHSetup(node):
469   """Setup the SSH configuration for the cluster.
470
471
472   This generates a dsa keypair for root, adds the pub key to the
473   permitted hosts and adds the hostkey to its own known hosts.
474
475   Args:
476     node: the name of this host as a fqdn
477
478   """
479   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
480
481   for name in priv_key, pub_key:
482     if os.path.exists(name):
483       utils.CreateBackup(name)
484     utils.RemoveFile(name)
485
486   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
487                          "-f", priv_key,
488                          "-q", "-N", ""])
489   if result.failed:
490     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
491                              result.output)
492
493   f = open(pub_key, 'r')
494   try:
495     utils.AddAuthorizedKey(auth_keys, f.read(8192))
496   finally:
497     f.close()
498
499
500 def _InitGanetiServerSetup(ss):
501   """Setup the necessary configuration for the initial node daemon.
502
503   This creates the nodepass file containing the shared password for
504   the cluster and also generates the SSL certificate.
505
506   """
507   # Create pseudo random password
508   randpass = sha.new(os.urandom(64)).hexdigest()
509   # and write it into sstore
510   ss.SetKey(ss.SS_NODED_PASS, randpass)
511
512   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
513                          "-days", str(365*5), "-nodes", "-x509",
514                          "-keyout", constants.SSL_CERT_FILE,
515                          "-out", constants.SSL_CERT_FILE, "-batch"])
516   if result.failed:
517     raise errors.OpExecError("could not generate server ssl cert, command"
518                              " %s had exitcode %s and error message %s" %
519                              (result.cmd, result.exit_code, result.output))
520
521   os.chmod(constants.SSL_CERT_FILE, 0400)
522
523   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
524
525   if result.failed:
526     raise errors.OpExecError("Could not start the node daemon, command %s"
527                              " had exitcode %s and error %s" %
528                              (result.cmd, result.exit_code, result.output))
529
530
531 class LUInitCluster(LogicalUnit):
532   """Initialise the cluster.
533
534   """
535   HPATH = "cluster-init"
536   HTYPE = constants.HTYPE_CLUSTER
537   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
538               "def_bridge", "master_netdev"]
539   REQ_CLUSTER = False
540
541   def BuildHooksEnv(self):
542     """Build hooks env.
543
544     Notes: Since we don't require a cluster, we must manually add
545     ourselves in the post-run node list.
546
547     """
548     env = {"OP_TARGET": self.op.cluster_name}
549     return env, [], [self.hostname.name]
550
551   def CheckPrereq(self):
552     """Verify that the passed name is a valid one.
553
554     """
555     if config.ConfigWriter.IsCluster():
556       raise errors.OpPrereqError("Cluster is already initialised")
557
558     self.hostname = hostname = utils.HostInfo()
559
560     if hostname.ip.startswith("127."):
561       raise errors.OpPrereqError("This host's IP resolves to the private"
562                                  " range (%s). Please fix DNS or /etc/hosts." %
563                                  (hostname.ip,))
564
565     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
566
567     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
568                          constants.DEFAULT_NODED_PORT):
569       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
570                                  " to %s,\nbut this ip address does not"
571                                  " belong to this host."
572                                  " Aborting." % hostname.ip)
573
574     secondary_ip = getattr(self.op, "secondary_ip", None)
575     if secondary_ip and not utils.IsValidIP(secondary_ip):
576       raise errors.OpPrereqError("Invalid secondary ip given")
577     if (secondary_ip and
578         secondary_ip != hostname.ip and
579         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
580                            constants.DEFAULT_NODED_PORT))):
581       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
582                                  "but it does not belong to this host." %
583                                  secondary_ip)
584     self.secondary_ip = secondary_ip
585
586     # checks presence of the volume group given
587     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
588
589     if vgstatus:
590       raise errors.OpPrereqError("Error: %s" % vgstatus)
591
592     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
593                     self.op.mac_prefix):
594       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
595                                  self.op.mac_prefix)
596
597     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
598       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
599                                  self.op.hypervisor_type)
600
601     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
602     if result.failed:
603       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
604                                  (self.op.master_netdev,
605                                   result.output.strip()))
606
607   def Exec(self, feedback_fn):
608     """Initialize the cluster.
609
610     """
611     clustername = self.clustername
612     hostname = self.hostname
613
614     # set up the simple store
615     self.sstore = ss = ssconf.SimpleStore()
616     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
617     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
618     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
619     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
620     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
621
622     # set up the inter-node password and certificate
623     _InitGanetiServerSetup(ss)
624
625     # start the master ip
626     rpc.call_node_start_master(hostname.name)
627
628     # set up ssh config and /etc/hosts
629     f = open(constants.SSH_HOST_RSA_PUB, 'r')
630     try:
631       sshline = f.read()
632     finally:
633       f.close()
634     sshkey = sshline.split(" ")[1]
635
636     _UpdateEtcHosts(hostname.name, hostname.ip)
637
638     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
639
640     _InitSSHSetup(hostname.name)
641
642     # init of cluster config file
643     self.cfg = cfgw = config.ConfigWriter()
644     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
645                     sshkey, self.op.mac_prefix,
646                     self.op.vg_name, self.op.def_bridge)
647
648
649 class LUDestroyCluster(NoHooksLU):
650   """Logical unit for destroying the cluster.
651
652   """
653   _OP_REQP = []
654
655   def CheckPrereq(self):
656     """Check prerequisites.
657
658     This checks whether the cluster is empty.
659
660     Any errors are signalled by raising errors.OpPrereqError.
661
662     """
663     master = self.sstore.GetMasterNode()
664
665     nodelist = self.cfg.GetNodeList()
666     if len(nodelist) != 1 or nodelist[0] != master:
667       raise errors.OpPrereqError("There are still %d node(s) in"
668                                  " this cluster." % (len(nodelist) - 1))
669     instancelist = self.cfg.GetInstanceList()
670     if instancelist:
671       raise errors.OpPrereqError("There are still %d instance(s) in"
672                                  " this cluster." % len(instancelist))
673
674   def Exec(self, feedback_fn):
675     """Destroys the cluster.
676
677     """
678     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679     utils.CreateBackup(priv_key)
680     utils.CreateBackup(pub_key)
681     rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
682
683
684 class LUVerifyCluster(NoHooksLU):
685   """Verifies the cluster status.
686
687   """
688   _OP_REQP = []
689
690   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
691                   remote_version, feedback_fn):
692     """Run multiple tests against a node.
693
694     Test list:
695       - compares ganeti version
696       - checks vg existance and size > 20G
697       - checks config file checksum
698       - checks ssh to other nodes
699
700     Args:
701       node: name of the node to check
702       file_list: required list of files
703       local_cksum: dictionary of local files and their checksums
704
705     """
706     # compares ganeti version
707     local_version = constants.PROTOCOL_VERSION
708     if not remote_version:
709       feedback_fn(" - ERROR: connection to %s failed" % (node))
710       return True
711
712     if local_version != remote_version:
713       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
714                       (local_version, node, remote_version))
715       return True
716
717     # checks vg existance and size > 20G
718
719     bad = False
720     if not vglist:
721       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
722                       (node,))
723       bad = True
724     else:
725       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
726       if vgstatus:
727         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728         bad = True
729
730     # checks config file checksum
731     # checks ssh to any
732
733     if 'filelist' not in node_result:
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       remote_cksum = node_result['filelist']
738       for file_name in file_list:
739         if file_name not in remote_cksum:
740           bad = True
741           feedback_fn("  - ERROR: file '%s' missing" % file_name)
742         elif remote_cksum[file_name] != local_cksum[file_name]:
743           bad = True
744           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
745
746     if 'nodelist' not in node_result:
747       bad = True
748       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
749     else:
750       if node_result['nodelist']:
751         bad = True
752         for node in node_result['nodelist']:
753           feedback_fn("  - ERROR: communication with node '%s': %s" %
754                           (node, node_result['nodelist'][node]))
755     hyp_result = node_result.get('hypervisor', None)
756     if hyp_result is not None:
757       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
758     return bad
759
760   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
761     """Verify an instance.
762
763     This function checks to see if the required block devices are
764     available on the instance's node.
765
766     """
767     bad = False
768
769     instancelist = self.cfg.GetInstanceList()
770     if not instance in instancelist:
771       feedback_fn("  - ERROR: instance %s not in instance list %s" %
772                       (instance, instancelist))
773       bad = True
774
775     instanceconfig = self.cfg.GetInstanceInfo(instance)
776     node_current = instanceconfig.primary_node
777
778     node_vol_should = {}
779     instanceconfig.MapLVsByNode(node_vol_should)
780
781     for node in node_vol_should:
782       for volume in node_vol_should[node]:
783         if node not in node_vol_is or volume not in node_vol_is[node]:
784           feedback_fn("  - ERROR: volume %s missing on node %s" %
785                           (volume, node))
786           bad = True
787
788     if not instanceconfig.status == 'down':
789       if not instance in node_instance[node_current]:
790         feedback_fn("  - ERROR: instance %s not running on node %s" %
791                         (instance, node_current))
792         bad = True
793
794     for node in node_instance:
795       if (not node == node_current):
796         if instance in node_instance[node]:
797           feedback_fn("  - ERROR: instance %s should not run on node %s" %
798                           (instance, node))
799           bad = True
800
801     return not bad
802
803   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
804     """Verify if there are any unknown volumes in the cluster.
805
806     The .os, .swap and backup volumes are ignored. All other volumes are
807     reported as unknown.
808
809     """
810     bad = False
811
812     for node in node_vol_is:
813       for volume in node_vol_is[node]:
814         if node not in node_vol_should or volume not in node_vol_should[node]:
815           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
816                       (volume, node))
817           bad = True
818     return bad
819
820   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
821     """Verify the list of running instances.
822
823     This checks what instances are running but unknown to the cluster.
824
825     """
826     bad = False
827     for node in node_instance:
828       for runninginstance in node_instance[node]:
829         if runninginstance not in instancelist:
830           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
831                           (runninginstance, node))
832           bad = True
833     return bad
834
835   def CheckPrereq(self):
836     """Check prerequisites.
837
838     This has no prerequisites.
839
840     """
841     pass
842
843   def Exec(self, feedback_fn):
844     """Verify integrity of cluster, performing various test on nodes.
845
846     """
847     bad = False
848     feedback_fn("* Verifying global settings")
849     self.cfg.VerifyConfig()
850
851     master = self.sstore.GetMasterNode()
852     vg_name = self.cfg.GetVGName()
853     nodelist = utils.NiceSort(self.cfg.GetNodeList())
854     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
855     node_volume = {}
856     node_instance = {}
857
858     # FIXME: verify OS list
859     # do local checksums
860     file_names = list(self.sstore.GetFileList())
861     file_names.append(constants.SSL_CERT_FILE)
862     file_names.append(constants.CLUSTER_CONF_FILE)
863     local_checksums = utils.FingerprintFiles(file_names)
864
865     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
866     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
867     all_instanceinfo = rpc.call_instance_list(nodelist)
868     all_vglist = rpc.call_vg_list(nodelist)
869     node_verify_param = {
870       'filelist': file_names,
871       'nodelist': nodelist,
872       'hypervisor': None,
873       }
874     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
875     all_rversion = rpc.call_version(nodelist)
876
877     for node in nodelist:
878       feedback_fn("* Verifying node %s" % node)
879       result = self._VerifyNode(node, file_names, local_checksums,
880                                 all_vglist[node], all_nvinfo[node],
881                                 all_rversion[node], feedback_fn)
882       bad = bad or result
883
884       # node_volume
885       volumeinfo = all_volumeinfo[node]
886
887       if type(volumeinfo) != dict:
888         feedback_fn("  - ERROR: connection to %s failed" % (node,))
889         bad = True
890         continue
891
892       node_volume[node] = volumeinfo
893
894       # node_instance
895       nodeinstance = all_instanceinfo[node]
896       if type(nodeinstance) != list:
897         feedback_fn("  - ERROR: connection to %s failed" % (node,))
898         bad = True
899         continue
900
901       node_instance[node] = nodeinstance
902
903     node_vol_should = {}
904
905     for instance in instancelist:
906       feedback_fn("* Verifying instance %s" % instance)
907       result =  self._VerifyInstance(instance, node_volume, node_instance,
908                                      feedback_fn)
909       bad = bad or result
910
911       inst_config = self.cfg.GetInstanceInfo(instance)
912
913       inst_config.MapLVsByNode(node_vol_should)
914
915     feedback_fn("* Verifying orphan volumes")
916     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
917                                        feedback_fn)
918     bad = bad or result
919
920     feedback_fn("* Verifying remaining instances")
921     result = self._VerifyOrphanInstances(instancelist, node_instance,
922                                          feedback_fn)
923     bad = bad or result
924
925     return int(bad)
926
927
928 class LURenameCluster(LogicalUnit):
929   """Rename the cluster.
930
931   """
932   HPATH = "cluster-rename"
933   HTYPE = constants.HTYPE_CLUSTER
934   _OP_REQP = ["name"]
935
936   def BuildHooksEnv(self):
937     """Build hooks env.
938
939     """
940     env = {
941       "OP_TARGET": self.op.sstore.GetClusterName(),
942       "NEW_NAME": self.op.name,
943       }
944     mn = self.sstore.GetMasterNode()
945     return env, [mn], [mn]
946
947   def CheckPrereq(self):
948     """Verify that the passed name is a valid one.
949
950     """
951     hostname = utils.HostInfo(self.op.name)
952
953     new_name = hostname.name
954     self.ip = new_ip = hostname.ip
955     old_name = self.sstore.GetClusterName()
956     old_ip = self.sstore.GetMasterIP()
957     if new_name == old_name and new_ip == old_ip:
958       raise errors.OpPrereqError("Neither the name nor the IP address of the"
959                                  " cluster has changed")
960     if new_ip != old_ip:
961       result = utils.RunCmd(["fping", "-q", new_ip])
962       if not result.failed:
963         raise errors.OpPrereqError("The given cluster IP address (%s) is"
964                                    " reachable on the network. Aborting." %
965                                    new_ip)
966
967     self.op.name = new_name
968
969   def Exec(self, feedback_fn):
970     """Rename the cluster.
971
972     """
973     clustername = self.op.name
974     ip = self.ip
975     ss = self.sstore
976
977     # shutdown the master IP
978     master = ss.GetMasterNode()
979     if not rpc.call_node_stop_master(master):
980       raise errors.OpExecError("Could not disable the master role")
981
982     try:
983       # modify the sstore
984       ss.SetKey(ss.SS_MASTER_IP, ip)
985       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
986
987       # Distribute updated ss config to all nodes
988       myself = self.cfg.GetNodeInfo(master)
989       dist_nodes = self.cfg.GetNodeList()
990       if myself.name in dist_nodes:
991         dist_nodes.remove(myself.name)
992
993       logger.Debug("Copying updated ssconf data to all nodes")
994       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
995         fname = ss.KeyToFilename(keyname)
996         result = rpc.call_upload_file(dist_nodes, fname)
997         for to_node in dist_nodes:
998           if not result[to_node]:
999             logger.Error("copy of file %s to node %s failed" %
1000                          (fname, to_node))
1001     finally:
1002       if not rpc.call_node_start_master(master):
1003         logger.Error("Could not re-enable the master role on the master,\n"
1004                      "please restart manually.")
1005
1006
1007 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
1008   """Sleep and poll for an instance's disk to sync.
1009
1010   """
1011   if not instance.disks:
1012     return True
1013
1014   if not oneshot:
1015     logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
1016
1017   node = instance.primary_node
1018
1019   for dev in instance.disks:
1020     cfgw.SetDiskID(dev, node)
1021
1022   retries = 0
1023   while True:
1024     max_time = 0
1025     done = True
1026     cumul_degraded = False
1027     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1028     if not rstats:
1029       logger.ToStderr("Can't get any data from node %s" % node)
1030       retries += 1
1031       if retries >= 10:
1032         raise errors.RemoteError("Can't contact node %s for mirror data,"
1033                                  " aborting." % node)
1034       time.sleep(6)
1035       continue
1036     retries = 0
1037     for i in range(len(rstats)):
1038       mstat = rstats[i]
1039       if mstat is None:
1040         logger.ToStderr("Can't compute data for node %s/%s" %
1041                         (node, instance.disks[i].iv_name))
1042         continue
1043       perc_done, est_time, is_degraded = mstat
1044       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1045       if perc_done is not None:
1046         done = False
1047         if est_time is not None:
1048           rem_time = "%d estimated seconds remaining" % est_time
1049           max_time = est_time
1050         else:
1051           rem_time = "no time estimate"
1052         logger.ToStdout("- device %s: %5.2f%% done, %s" %
1053                         (instance.disks[i].iv_name, perc_done, rem_time))
1054     if done or oneshot:
1055       break
1056
1057     if unlock:
1058       utils.Unlock('cmd')
1059     try:
1060       time.sleep(min(60, max_time))
1061     finally:
1062       if unlock:
1063         utils.Lock('cmd')
1064
1065   if done:
1066     logger.ToStdout("Instance %s's disks are in sync." % instance.name)
1067   return not cumul_degraded
1068
1069
1070 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
1071   """Check that mirrors are not degraded.
1072
1073   """
1074   cfgw.SetDiskID(dev, node)
1075
1076   result = True
1077   if on_primary or dev.AssembleOnSecondary():
1078     rstats = rpc.call_blockdev_find(node, dev)
1079     if not rstats:
1080       logger.ToStderr("Can't get any data from node %s" % node)
1081       result = False
1082     else:
1083       result = result and (not rstats[5])
1084   if dev.children:
1085     for child in dev.children:
1086       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1087
1088   return result
1089
1090
1091 class LUDiagnoseOS(NoHooksLU):
1092   """Logical unit for OS diagnose/query.
1093
1094   """
1095   _OP_REQP = []
1096
1097   def CheckPrereq(self):
1098     """Check prerequisites.
1099
1100     This always succeeds, since this is a pure query LU.
1101
1102     """
1103     return
1104
1105   def Exec(self, feedback_fn):
1106     """Compute the list of OSes.
1107
1108     """
1109     node_list = self.cfg.GetNodeList()
1110     node_data = rpc.call_os_diagnose(node_list)
1111     if node_data == False:
1112       raise errors.OpExecError("Can't gather the list of OSes")
1113     return node_data
1114
1115
1116 class LURemoveNode(LogicalUnit):
1117   """Logical unit for removing a node.
1118
1119   """
1120   HPATH = "node-remove"
1121   HTYPE = constants.HTYPE_NODE
1122   _OP_REQP = ["node_name"]
1123
1124   def BuildHooksEnv(self):
1125     """Build hooks env.
1126
1127     This doesn't run on the target node in the pre phase as a failed
1128     node would not allows itself to run.
1129
1130     """
1131     env = {
1132       "OP_TARGET": self.op.node_name,
1133       "NODE_NAME": self.op.node_name,
1134       }
1135     all_nodes = self.cfg.GetNodeList()
1136     all_nodes.remove(self.op.node_name)
1137     return env, all_nodes, all_nodes
1138
1139   def CheckPrereq(self):
1140     """Check prerequisites.
1141
1142     This checks:
1143      - the node exists in the configuration
1144      - it does not have primary or secondary instances
1145      - it's not the master
1146
1147     Any errors are signalled by raising errors.OpPrereqError.
1148
1149     """
1150     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1151     if node is None:
1152       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1153
1154     instance_list = self.cfg.GetInstanceList()
1155
1156     masternode = self.sstore.GetMasterNode()
1157     if node.name == masternode:
1158       raise errors.OpPrereqError("Node is the master node,"
1159                                  " you need to failover first.")
1160
1161     for instance_name in instance_list:
1162       instance = self.cfg.GetInstanceInfo(instance_name)
1163       if node.name == instance.primary_node:
1164         raise errors.OpPrereqError("Instance %s still running on the node,"
1165                                    " please remove first." % instance_name)
1166       if node.name in instance.secondary_nodes:
1167         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1168                                    " please remove first." % instance_name)
1169     self.op.node_name = node.name
1170     self.node = node
1171
1172   def Exec(self, feedback_fn):
1173     """Removes the node from the cluster.
1174
1175     """
1176     node = self.node
1177     logger.Info("stopping the node daemon and removing configs from node %s" %
1178                 node.name)
1179
1180     rpc.call_node_leave_cluster(node.name)
1181
1182     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1183
1184     logger.Info("Removing node %s from config" % node.name)
1185
1186     self.cfg.RemoveNode(node.name)
1187
1188
1189 class LUQueryNodes(NoHooksLU):
1190   """Logical unit for querying nodes.
1191
1192   """
1193   _OP_REQP = ["output_fields", "names"]
1194
1195   def CheckPrereq(self):
1196     """Check prerequisites.
1197
1198     This checks that the fields required are valid output fields.
1199
1200     """
1201     self.dynamic_fields = frozenset(["dtotal", "dfree",
1202                                      "mtotal", "mnode", "mfree",
1203                                      "bootid"])
1204
1205     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1206                                "pinst_list", "sinst_list",
1207                                "pip", "sip"],
1208                        dynamic=self.dynamic_fields,
1209                        selected=self.op.output_fields)
1210
1211     self.wanted = _GetWantedNodes(self, self.op.names)
1212
1213   def Exec(self, feedback_fn):
1214     """Computes the list of nodes and their attributes.
1215
1216     """
1217     nodenames = self.wanted
1218     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1219
1220     # begin data gathering
1221
1222     if self.dynamic_fields.intersection(self.op.output_fields):
1223       live_data = {}
1224       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1225       for name in nodenames:
1226         nodeinfo = node_data.get(name, None)
1227         if nodeinfo:
1228           live_data[name] = {
1229             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1230             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1231             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1232             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1233             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1234             "bootid": nodeinfo['bootid'],
1235             }
1236         else:
1237           live_data[name] = {}
1238     else:
1239       live_data = dict.fromkeys(nodenames, {})
1240
1241     node_to_primary = dict([(name, set()) for name in nodenames])
1242     node_to_secondary = dict([(name, set()) for name in nodenames])
1243
1244     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1245                              "sinst_cnt", "sinst_list"))
1246     if inst_fields & frozenset(self.op.output_fields):
1247       instancelist = self.cfg.GetInstanceList()
1248
1249       for instance_name in instancelist:
1250         inst = self.cfg.GetInstanceInfo(instance_name)
1251         if inst.primary_node in node_to_primary:
1252           node_to_primary[inst.primary_node].add(inst.name)
1253         for secnode in inst.secondary_nodes:
1254           if secnode in node_to_secondary:
1255             node_to_secondary[secnode].add(inst.name)
1256
1257     # end data gathering
1258
1259     output = []
1260     for node in nodelist:
1261       node_output = []
1262       for field in self.op.output_fields:
1263         if field == "name":
1264           val = node.name
1265         elif field == "pinst_list":
1266           val = list(node_to_primary[node.name])
1267         elif field == "sinst_list":
1268           val = list(node_to_secondary[node.name])
1269         elif field == "pinst_cnt":
1270           val = len(node_to_primary[node.name])
1271         elif field == "sinst_cnt":
1272           val = len(node_to_secondary[node.name])
1273         elif field == "pip":
1274           val = node.primary_ip
1275         elif field == "sip":
1276           val = node.secondary_ip
1277         elif field in self.dynamic_fields:
1278           val = live_data[node.name].get(field, None)
1279         else:
1280           raise errors.ParameterError(field)
1281         node_output.append(val)
1282       output.append(node_output)
1283
1284     return output
1285
1286
1287 class LUQueryNodeVolumes(NoHooksLU):
1288   """Logical unit for getting volumes on node(s).
1289
1290   """
1291   _OP_REQP = ["nodes", "output_fields"]
1292
1293   def CheckPrereq(self):
1294     """Check prerequisites.
1295
1296     This checks that the fields required are valid output fields.
1297
1298     """
1299     self.nodes = _GetWantedNodes(self, self.op.nodes)
1300
1301     _CheckOutputFields(static=["node"],
1302                        dynamic=["phys", "vg", "name", "size", "instance"],
1303                        selected=self.op.output_fields)
1304
1305
1306   def Exec(self, feedback_fn):
1307     """Computes the list of nodes and their attributes.
1308
1309     """
1310     nodenames = self.nodes
1311     volumes = rpc.call_node_volumes(nodenames)
1312
1313     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1314              in self.cfg.GetInstanceList()]
1315
1316     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1317
1318     output = []
1319     for node in nodenames:
1320       if node not in volumes or not volumes[node]:
1321         continue
1322
1323       node_vols = volumes[node][:]
1324       node_vols.sort(key=lambda vol: vol['dev'])
1325
1326       for vol in node_vols:
1327         node_output = []
1328         for field in self.op.output_fields:
1329           if field == "node":
1330             val = node
1331           elif field == "phys":
1332             val = vol['dev']
1333           elif field == "vg":
1334             val = vol['vg']
1335           elif field == "name":
1336             val = vol['name']
1337           elif field == "size":
1338             val = int(float(vol['size']))
1339           elif field == "instance":
1340             for inst in ilist:
1341               if node not in lv_by_node[inst]:
1342                 continue
1343               if vol['name'] in lv_by_node[inst][node]:
1344                 val = inst.name
1345                 break
1346             else:
1347               val = '-'
1348           else:
1349             raise errors.ParameterError(field)
1350           node_output.append(str(val))
1351
1352         output.append(node_output)
1353
1354     return output
1355
1356
1357 class LUAddNode(LogicalUnit):
1358   """Logical unit for adding node to the cluster.
1359
1360   """
1361   HPATH = "node-add"
1362   HTYPE = constants.HTYPE_NODE
1363   _OP_REQP = ["node_name"]
1364
1365   def BuildHooksEnv(self):
1366     """Build hooks env.
1367
1368     This will run on all nodes before, and on all nodes + the new node after.
1369
1370     """
1371     env = {
1372       "OP_TARGET": self.op.node_name,
1373       "NODE_NAME": self.op.node_name,
1374       "NODE_PIP": self.op.primary_ip,
1375       "NODE_SIP": self.op.secondary_ip,
1376       }
1377     nodes_0 = self.cfg.GetNodeList()
1378     nodes_1 = nodes_0 + [self.op.node_name, ]
1379     return env, nodes_0, nodes_1
1380
1381   def CheckPrereq(self):
1382     """Check prerequisites.
1383
1384     This checks:
1385      - the new node is not already in the config
1386      - it is resolvable
1387      - its parameters (single/dual homed) matches the cluster
1388
1389     Any errors are signalled by raising errors.OpPrereqError.
1390
1391     """
1392     node_name = self.op.node_name
1393     cfg = self.cfg
1394
1395     dns_data = utils.HostInfo(node_name)
1396
1397     node = dns_data.name
1398     primary_ip = self.op.primary_ip = dns_data.ip
1399     secondary_ip = getattr(self.op, "secondary_ip", None)
1400     if secondary_ip is None:
1401       secondary_ip = primary_ip
1402     if not utils.IsValidIP(secondary_ip):
1403       raise errors.OpPrereqError("Invalid secondary IP given")
1404     self.op.secondary_ip = secondary_ip
1405     node_list = cfg.GetNodeList()
1406     if node in node_list:
1407       raise errors.OpPrereqError("Node %s is already in the configuration"
1408                                  % node)
1409
1410     for existing_node_name in node_list:
1411       existing_node = cfg.GetNodeInfo(existing_node_name)
1412       if (existing_node.primary_ip == primary_ip or
1413           existing_node.secondary_ip == primary_ip or
1414           existing_node.primary_ip == secondary_ip or
1415           existing_node.secondary_ip == secondary_ip):
1416         raise errors.OpPrereqError("New node ip address(es) conflict with"
1417                                    " existing node %s" % existing_node.name)
1418
1419     # check that the type of the node (single versus dual homed) is the
1420     # same as for the master
1421     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1422     master_singlehomed = myself.secondary_ip == myself.primary_ip
1423     newbie_singlehomed = secondary_ip == primary_ip
1424     if master_singlehomed != newbie_singlehomed:
1425       if master_singlehomed:
1426         raise errors.OpPrereqError("The master has no private ip but the"
1427                                    " new node has one")
1428       else:
1429         raise errors.OpPrereqError("The master has a private ip but the"
1430                                    " new node doesn't have one")
1431
1432     # checks reachablity
1433     if not utils.TcpPing(utils.HostInfo().name,
1434                          primary_ip,
1435                          constants.DEFAULT_NODED_PORT):
1436       raise errors.OpPrereqError("Node not reachable by ping")
1437
1438     if not newbie_singlehomed:
1439       # check reachability from my secondary ip to newbie's secondary ip
1440       if not utils.TcpPing(myself.secondary_ip,
1441                            secondary_ip,
1442                            constants.DEFAULT_NODED_PORT):
1443         raise errors.OpPrereqError(
1444           "Node secondary ip not reachable by TCP based ping to noded port")
1445
1446     self.new_node = objects.Node(name=node,
1447                                  primary_ip=primary_ip,
1448                                  secondary_ip=secondary_ip)
1449
1450   def Exec(self, feedback_fn):
1451     """Adds the new node to the cluster.
1452
1453     """
1454     new_node = self.new_node
1455     node = new_node.name
1456
1457     # set up inter-node password and certificate and restarts the node daemon
1458     gntpass = self.sstore.GetNodeDaemonPassword()
1459     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1460       raise errors.OpExecError("ganeti password corruption detected")
1461     f = open(constants.SSL_CERT_FILE)
1462     try:
1463       gntpem = f.read(8192)
1464     finally:
1465       f.close()
1466     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1467     # so we use this to detect an invalid certificate; as long as the
1468     # cert doesn't contain this, the here-document will be correctly
1469     # parsed by the shell sequence below
1470     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1471       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1472     if not gntpem.endswith("\n"):
1473       raise errors.OpExecError("PEM must end with newline")
1474     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1475
1476     # and then connect with ssh to set password and start ganeti-noded
1477     # note that all the below variables are sanitized at this point,
1478     # either by being constants or by the checks above
1479     ss = self.sstore
1480     mycommand = ("umask 077 && "
1481                  "echo '%s' > '%s' && "
1482                  "cat > '%s' << '!EOF.' && \n"
1483                  "%s!EOF.\n%s restart" %
1484                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1485                   constants.SSL_CERT_FILE, gntpem,
1486                   constants.NODE_INITD_SCRIPT))
1487
1488     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1489     if result.failed:
1490       raise errors.OpExecError("Remote command on node %s, error: %s,"
1491                                " output: %s" %
1492                                (node, result.fail_reason, result.output))
1493
1494     # check connectivity
1495     time.sleep(4)
1496
1497     result = rpc.call_version([node])[node]
1498     if result:
1499       if constants.PROTOCOL_VERSION == result:
1500         logger.Info("communication to node %s fine, sw version %s match" %
1501                     (node, result))
1502       else:
1503         raise errors.OpExecError("Version mismatch master version %s,"
1504                                  " node version %s" %
1505                                  (constants.PROTOCOL_VERSION, result))
1506     else:
1507       raise errors.OpExecError("Cannot get version from the new node")
1508
1509     # setup ssh on node
1510     logger.Info("copy ssh key to node %s" % node)
1511     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1512     keyarray = []
1513     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1514                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1515                 priv_key, pub_key]
1516
1517     for i in keyfiles:
1518       f = open(i, 'r')
1519       try:
1520         keyarray.append(f.read())
1521       finally:
1522         f.close()
1523
1524     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1525                                keyarray[3], keyarray[4], keyarray[5])
1526
1527     if not result:
1528       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1529
1530     # Add node to our /etc/hosts, and add key to known_hosts
1531     _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1532     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1533                       self.cfg.GetHostKey())
1534
1535     if new_node.secondary_ip != new_node.primary_ip:
1536       if not rpc.call_node_tcp_ping(new_node.name,
1537                                     constants.LOCALHOST_IP_ADDRESS,
1538                                     new_node.secondary_ip,
1539                                     constants.DEFAULT_NODED_PORT,
1540                                     10, False):
1541         raise errors.OpExecError("Node claims it doesn't have the"
1542                                  " secondary ip you gave (%s).\n"
1543                                  "Please fix and re-run this command." %
1544                                  new_node.secondary_ip)
1545
1546     success, msg = ssh.VerifyNodeHostname(node)
1547     if not success:
1548       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1549                                " than the one the resolver gives: %s.\n"
1550                                "Please fix and re-run this command." %
1551                                (node, msg))
1552
1553     # Distribute updated /etc/hosts and known_hosts to all nodes,
1554     # including the node just added
1555     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1556     dist_nodes = self.cfg.GetNodeList() + [node]
1557     if myself.name in dist_nodes:
1558       dist_nodes.remove(myself.name)
1559
1560     logger.Debug("Copying hosts and known_hosts to all nodes")
1561     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1562       result = rpc.call_upload_file(dist_nodes, fname)
1563       for to_node in dist_nodes:
1564         if not result[to_node]:
1565           logger.Error("copy of file %s to node %s failed" %
1566                        (fname, to_node))
1567
1568     to_copy = ss.GetFileList()
1569     for fname in to_copy:
1570       if not ssh.CopyFileToNode(node, fname):
1571         logger.Error("could not copy file %s to node %s" % (fname, node))
1572
1573     logger.Info("adding node %s to cluster.conf" % node)
1574     self.cfg.AddNode(new_node)
1575
1576
1577 class LUMasterFailover(LogicalUnit):
1578   """Failover the master node to the current node.
1579
1580   This is a special LU in that it must run on a non-master node.
1581
1582   """
1583   HPATH = "master-failover"
1584   HTYPE = constants.HTYPE_CLUSTER
1585   REQ_MASTER = False
1586   _OP_REQP = []
1587
1588   def BuildHooksEnv(self):
1589     """Build hooks env.
1590
1591     This will run on the new master only in the pre phase, and on all
1592     the nodes in the post phase.
1593
1594     """
1595     env = {
1596       "OP_TARGET": self.new_master,
1597       "NEW_MASTER": self.new_master,
1598       "OLD_MASTER": self.old_master,
1599       }
1600     return env, [self.new_master], self.cfg.GetNodeList()
1601
1602   def CheckPrereq(self):
1603     """Check prerequisites.
1604
1605     This checks that we are not already the master.
1606
1607     """
1608     self.new_master = utils.HostInfo().name
1609     self.old_master = self.sstore.GetMasterNode()
1610
1611     if self.old_master == self.new_master:
1612       raise errors.OpPrereqError("This commands must be run on the node"
1613                                  " where you want the new master to be.\n"
1614                                  "%s is already the master" %
1615                                  self.old_master)
1616
1617   def Exec(self, feedback_fn):
1618     """Failover the master node.
1619
1620     This command, when run on a non-master node, will cause the current
1621     master to cease being master, and the non-master to become new
1622     master.
1623
1624     """
1625     #TODO: do not rely on gethostname returning the FQDN
1626     logger.Info("setting master to %s, old master: %s" %
1627                 (self.new_master, self.old_master))
1628
1629     if not rpc.call_node_stop_master(self.old_master):
1630       logger.Error("could disable the master role on the old master"
1631                    " %s, please disable manually" % self.old_master)
1632
1633     ss = self.sstore
1634     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1635     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1636                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1637       logger.Error("could not distribute the new simple store master file"
1638                    " to the other nodes, please check.")
1639
1640     if not rpc.call_node_start_master(self.new_master):
1641       logger.Error("could not start the master role on the new master"
1642                    " %s, please check" % self.new_master)
1643       feedback_fn("Error in activating the master IP on the new master,\n"
1644                   "please fix manually.")
1645
1646
1647
1648 class LUQueryClusterInfo(NoHooksLU):
1649   """Query cluster configuration.
1650
1651   """
1652   _OP_REQP = []
1653   REQ_MASTER = False
1654
1655   def CheckPrereq(self):
1656     """No prerequsites needed for this LU.
1657
1658     """
1659     pass
1660
1661   def Exec(self, feedback_fn):
1662     """Return cluster config.
1663
1664     """
1665     result = {
1666       "name": self.sstore.GetClusterName(),
1667       "software_version": constants.RELEASE_VERSION,
1668       "protocol_version": constants.PROTOCOL_VERSION,
1669       "config_version": constants.CONFIG_VERSION,
1670       "os_api_version": constants.OS_API_VERSION,
1671       "export_version": constants.EXPORT_VERSION,
1672       "master": self.sstore.GetMasterNode(),
1673       "architecture": (platform.architecture()[0], platform.machine()),
1674       }
1675
1676     return result
1677
1678
1679 class LUClusterCopyFile(NoHooksLU):
1680   """Copy file to cluster.
1681
1682   """
1683   _OP_REQP = ["nodes", "filename"]
1684
1685   def CheckPrereq(self):
1686     """Check prerequisites.
1687
1688     It should check that the named file exists and that the given list
1689     of nodes is valid.
1690
1691     """
1692     if not os.path.exists(self.op.filename):
1693       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1694
1695     self.nodes = _GetWantedNodes(self, self.op.nodes)
1696
1697   def Exec(self, feedback_fn):
1698     """Copy a file from master to some nodes.
1699
1700     Args:
1701       opts - class with options as members
1702       args - list containing a single element, the file name
1703     Opts used:
1704       nodes - list containing the name of target nodes; if empty, all nodes
1705
1706     """
1707     filename = self.op.filename
1708
1709     myname = utils.HostInfo().name
1710
1711     for node in self.nodes:
1712       if node == myname:
1713         continue
1714       if not ssh.CopyFileToNode(node, filename):
1715         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1716
1717
1718 class LUDumpClusterConfig(NoHooksLU):
1719   """Return a text-representation of the cluster-config.
1720
1721   """
1722   _OP_REQP = []
1723
1724   def CheckPrereq(self):
1725     """No prerequisites.
1726
1727     """
1728     pass
1729
1730   def Exec(self, feedback_fn):
1731     """Dump a representation of the cluster config to the standard output.
1732
1733     """
1734     return self.cfg.DumpConfig()
1735
1736
1737 class LURunClusterCommand(NoHooksLU):
1738   """Run a command on some nodes.
1739
1740   """
1741   _OP_REQP = ["command", "nodes"]
1742
1743   def CheckPrereq(self):
1744     """Check prerequisites.
1745
1746     It checks that the given list of nodes is valid.
1747
1748     """
1749     self.nodes = _GetWantedNodes(self, self.op.nodes)
1750
1751   def Exec(self, feedback_fn):
1752     """Run a command on some nodes.
1753
1754     """
1755     data = []
1756     for node in self.nodes:
1757       result = ssh.SSHCall(node, "root", self.op.command)
1758       data.append((node, result.output, result.exit_code))
1759
1760     return data
1761
1762
1763 class LUActivateInstanceDisks(NoHooksLU):
1764   """Bring up an instance's disks.
1765
1766   """
1767   _OP_REQP = ["instance_name"]
1768
1769   def CheckPrereq(self):
1770     """Check prerequisites.
1771
1772     This checks that the instance is in the cluster.
1773
1774     """
1775     instance = self.cfg.GetInstanceInfo(
1776       self.cfg.ExpandInstanceName(self.op.instance_name))
1777     if instance is None:
1778       raise errors.OpPrereqError("Instance '%s' not known" %
1779                                  self.op.instance_name)
1780     self.instance = instance
1781
1782
1783   def Exec(self, feedback_fn):
1784     """Activate the disks.
1785
1786     """
1787     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1788     if not disks_ok:
1789       raise errors.OpExecError("Cannot activate block devices")
1790
1791     return disks_info
1792
1793
1794 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1795   """Prepare the block devices for an instance.
1796
1797   This sets up the block devices on all nodes.
1798
1799   Args:
1800     instance: a ganeti.objects.Instance object
1801     ignore_secondaries: if true, errors on secondary nodes won't result
1802                         in an error return from the function
1803
1804   Returns:
1805     false if the operation failed
1806     list of (host, instance_visible_name, node_visible_name) if the operation
1807          suceeded with the mapping from node devices to instance devices
1808   """
1809   device_info = []
1810   disks_ok = True
1811   for inst_disk in instance.disks:
1812     master_result = None
1813     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1814       cfg.SetDiskID(node_disk, node)
1815       is_primary = node == instance.primary_node
1816       result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1817       if not result:
1818         logger.Error("could not prepare block device %s on node %s (is_pri"
1819                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1820         if is_primary or not ignore_secondaries:
1821           disks_ok = False
1822       if is_primary:
1823         master_result = result
1824     device_info.append((instance.primary_node, inst_disk.iv_name,
1825                         master_result))
1826
1827   return disks_ok, device_info
1828
1829
1830 def _StartInstanceDisks(cfg, instance, force):
1831   """Start the disks of an instance.
1832
1833   """
1834   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1835                                            ignore_secondaries=force)
1836   if not disks_ok:
1837     _ShutdownInstanceDisks(instance, cfg)
1838     if force is not None and not force:
1839       logger.Error("If the message above refers to a secondary node,"
1840                    " you can retry the operation using '--force'.")
1841     raise errors.OpExecError("Disk consistency error")
1842
1843
1844 class LUDeactivateInstanceDisks(NoHooksLU):
1845   """Shutdown an instance's disks.
1846
1847   """
1848   _OP_REQP = ["instance_name"]
1849
1850   def CheckPrereq(self):
1851     """Check prerequisites.
1852
1853     This checks that the instance is in the cluster.
1854
1855     """
1856     instance = self.cfg.GetInstanceInfo(
1857       self.cfg.ExpandInstanceName(self.op.instance_name))
1858     if instance is None:
1859       raise errors.OpPrereqError("Instance '%s' not known" %
1860                                  self.op.instance_name)
1861     self.instance = instance
1862
1863   def Exec(self, feedback_fn):
1864     """Deactivate the disks
1865
1866     """
1867     instance = self.instance
1868     ins_l = rpc.call_instance_list([instance.primary_node])
1869     ins_l = ins_l[instance.primary_node]
1870     if not type(ins_l) is list:
1871       raise errors.OpExecError("Can't contact node '%s'" %
1872                                instance.primary_node)
1873
1874     if self.instance.name in ins_l:
1875       raise errors.OpExecError("Instance is running, can't shutdown"
1876                                " block devices.")
1877
1878     _ShutdownInstanceDisks(instance, self.cfg)
1879
1880
1881 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1882   """Shutdown block devices of an instance.
1883
1884   This does the shutdown on all nodes of the instance.
1885
1886   If the ignore_primary is false, errors on the primary node are
1887   ignored.
1888
1889   """
1890   result = True
1891   for disk in instance.disks:
1892     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1893       cfg.SetDiskID(top_disk, node)
1894       if not rpc.call_blockdev_shutdown(node, top_disk):
1895         logger.Error("could not shutdown block device %s on node %s" %
1896                      (disk.iv_name, node))
1897         if not ignore_primary or node != instance.primary_node:
1898           result = False
1899   return result
1900
1901
1902 class LUStartupInstance(LogicalUnit):
1903   """Starts an instance.
1904
1905   """
1906   HPATH = "instance-start"
1907   HTYPE = constants.HTYPE_INSTANCE
1908   _OP_REQP = ["instance_name", "force"]
1909
1910   def BuildHooksEnv(self):
1911     """Build hooks env.
1912
1913     This runs on master, primary and secondary nodes of the instance.
1914
1915     """
1916     env = {
1917       "FORCE": self.op.force,
1918       }
1919     env.update(_BuildInstanceHookEnvByObject(self.instance))
1920     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1921           list(self.instance.secondary_nodes))
1922     return env, nl, nl
1923
1924   def CheckPrereq(self):
1925     """Check prerequisites.
1926
1927     This checks that the instance is in the cluster.
1928
1929     """
1930     instance = self.cfg.GetInstanceInfo(
1931       self.cfg.ExpandInstanceName(self.op.instance_name))
1932     if instance is None:
1933       raise errors.OpPrereqError("Instance '%s' not known" %
1934                                  self.op.instance_name)
1935
1936     # check bridges existance
1937     brlist = [nic.bridge for nic in instance.nics]
1938     if not rpc.call_bridges_exist(instance.primary_node, brlist):
1939       raise errors.OpPrereqError("one or more target bridges %s does not"
1940                                  " exist on destination node '%s'" %
1941                                  (brlist, instance.primary_node))
1942
1943     self.instance = instance
1944     self.op.instance_name = instance.name
1945
1946   def Exec(self, feedback_fn):
1947     """Start the instance.
1948
1949     """
1950     instance = self.instance
1951     force = self.op.force
1952     extra_args = getattr(self.op, "extra_args", "")
1953
1954     node_current = instance.primary_node
1955
1956     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1957     if not nodeinfo:
1958       raise errors.OpExecError("Could not contact node %s for infos" %
1959                                (node_current))
1960
1961     freememory = nodeinfo[node_current]['memory_free']
1962     memory = instance.memory
1963     if memory > freememory:
1964       raise errors.OpExecError("Not enough memory to start instance"
1965                                " %s on node %s"
1966                                " needed %s MiB, available %s MiB" %
1967                                (instance.name, node_current, memory,
1968                                 freememory))
1969
1970     _StartInstanceDisks(self.cfg, instance, force)
1971
1972     if not rpc.call_instance_start(node_current, instance, extra_args):
1973       _ShutdownInstanceDisks(instance, self.cfg)
1974       raise errors.OpExecError("Could not start instance")
1975
1976     self.cfg.MarkInstanceUp(instance.name)
1977
1978
1979 class LUShutdownInstance(LogicalUnit):
1980   """Shutdown an instance.
1981
1982   """
1983   HPATH = "instance-stop"
1984   HTYPE = constants.HTYPE_INSTANCE
1985   _OP_REQP = ["instance_name"]
1986
1987   def BuildHooksEnv(self):
1988     """Build hooks env.
1989
1990     This runs on master, primary and secondary nodes of the instance.
1991
1992     """
1993     env = _BuildInstanceHookEnvByObject(self.instance)
1994     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1995           list(self.instance.secondary_nodes))
1996     return env, nl, nl
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     This checks that the instance is in the cluster.
2002
2003     """
2004     instance = self.cfg.GetInstanceInfo(
2005       self.cfg.ExpandInstanceName(self.op.instance_name))
2006     if instance is None:
2007       raise errors.OpPrereqError("Instance '%s' not known" %
2008                                  self.op.instance_name)
2009     self.instance = instance
2010
2011   def Exec(self, feedback_fn):
2012     """Shutdown the instance.
2013
2014     """
2015     instance = self.instance
2016     node_current = instance.primary_node
2017     if not rpc.call_instance_shutdown(node_current, instance):
2018       logger.Error("could not shutdown instance")
2019
2020     self.cfg.MarkInstanceDown(instance.name)
2021     _ShutdownInstanceDisks(instance, self.cfg)
2022
2023
2024 class LUReinstallInstance(LogicalUnit):
2025   """Reinstall an instance.
2026
2027   """
2028   HPATH = "instance-reinstall"
2029   HTYPE = constants.HTYPE_INSTANCE
2030   _OP_REQP = ["instance_name"]
2031
2032   def BuildHooksEnv(self):
2033     """Build hooks env.
2034
2035     This runs on master, primary and secondary nodes of the instance.
2036
2037     """
2038     env = _BuildInstanceHookEnvByObject(self.instance)
2039     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2040           list(self.instance.secondary_nodes))
2041     return env, nl, nl
2042
2043   def CheckPrereq(self):
2044     """Check prerequisites.
2045
2046     This checks that the instance is in the cluster and is not running.
2047
2048     """
2049     instance = self.cfg.GetInstanceInfo(
2050       self.cfg.ExpandInstanceName(self.op.instance_name))
2051     if instance is None:
2052       raise errors.OpPrereqError("Instance '%s' not known" %
2053                                  self.op.instance_name)
2054     if instance.disk_template == constants.DT_DISKLESS:
2055       raise errors.OpPrereqError("Instance '%s' has no disks" %
2056                                  self.op.instance_name)
2057     if instance.status != "down":
2058       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2059                                  self.op.instance_name)
2060     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2061     if remote_info:
2062       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2063                                  (self.op.instance_name,
2064                                   instance.primary_node))
2065
2066     self.op.os_type = getattr(self.op, "os_type", None)
2067     if self.op.os_type is not None:
2068       # OS verification
2069       pnode = self.cfg.GetNodeInfo(
2070         self.cfg.ExpandNodeName(instance.primary_node))
2071       if pnode is None:
2072         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2073                                    self.op.pnode)
2074       os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2075       if not isinstance(os_obj, objects.OS):
2076         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2077                                    " primary node"  % self.op.os_type)
2078
2079     self.instance = instance
2080
2081   def Exec(self, feedback_fn):
2082     """Reinstall the instance.
2083
2084     """
2085     inst = self.instance
2086
2087     if self.op.os_type is not None:
2088       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2089       inst.os = self.op.os_type
2090       self.cfg.AddInstance(inst)
2091
2092     _StartInstanceDisks(self.cfg, inst, None)
2093     try:
2094       feedback_fn("Running the instance OS create scripts...")
2095       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2096         raise errors.OpExecError("Could not install OS for instance %s "
2097                                  "on node %s" %
2098                                  (inst.name, inst.primary_node))
2099     finally:
2100       _ShutdownInstanceDisks(inst, self.cfg)
2101
2102
2103 class LURenameInstance(LogicalUnit):
2104   """Rename an instance.
2105
2106   """
2107   HPATH = "instance-rename"
2108   HTYPE = constants.HTYPE_INSTANCE
2109   _OP_REQP = ["instance_name", "new_name"]
2110
2111   def BuildHooksEnv(self):
2112     """Build hooks env.
2113
2114     This runs on master, primary and secondary nodes of the instance.
2115
2116     """
2117     env = _BuildInstanceHookEnvByObject(self.instance)
2118     env["INSTANCE_NEW_NAME"] = self.op.new_name
2119     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2120           list(self.instance.secondary_nodes))
2121     return env, nl, nl
2122
2123   def CheckPrereq(self):
2124     """Check prerequisites.
2125
2126     This checks that the instance is in the cluster and is not running.
2127
2128     """
2129     instance = self.cfg.GetInstanceInfo(
2130       self.cfg.ExpandInstanceName(self.op.instance_name))
2131     if instance is None:
2132       raise errors.OpPrereqError("Instance '%s' not known" %
2133                                  self.op.instance_name)
2134     if instance.status != "down":
2135       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2136                                  self.op.instance_name)
2137     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2138     if remote_info:
2139       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2140                                  (self.op.instance_name,
2141                                   instance.primary_node))
2142     self.instance = instance
2143
2144     # new name verification
2145     name_info = utils.HostInfo(self.op.new_name)
2146
2147     self.op.new_name = new_name = name_info.name
2148     if not getattr(self.op, "ignore_ip", False):
2149       command = ["fping", "-q", name_info.ip]
2150       result = utils.RunCmd(command)
2151       if not result.failed:
2152         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2153                                    (name_info.ip, new_name))
2154
2155
2156   def Exec(self, feedback_fn):
2157     """Reinstall the instance.
2158
2159     """
2160     inst = self.instance
2161     old_name = inst.name
2162
2163     self.cfg.RenameInstance(inst.name, self.op.new_name)
2164
2165     # re-read the instance from the configuration after rename
2166     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2167
2168     _StartInstanceDisks(self.cfg, inst, None)
2169     try:
2170       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2171                                           "sda", "sdb"):
2172         msg = ("Could run OS rename script for instance %s\n"
2173                "on node %s\n"
2174                "(but the instance has been renamed in Ganeti)" %
2175                (inst.name, inst.primary_node))
2176         logger.Error(msg)
2177     finally:
2178       _ShutdownInstanceDisks(inst, self.cfg)
2179
2180
2181 class LURemoveInstance(LogicalUnit):
2182   """Remove an instance.
2183
2184   """
2185   HPATH = "instance-remove"
2186   HTYPE = constants.HTYPE_INSTANCE
2187   _OP_REQP = ["instance_name"]
2188
2189   def BuildHooksEnv(self):
2190     """Build hooks env.
2191
2192     This runs on master, primary and secondary nodes of the instance.
2193
2194     """
2195     env = _BuildInstanceHookEnvByObject(self.instance)
2196     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2197           list(self.instance.secondary_nodes))
2198     return env, nl, nl
2199
2200   def CheckPrereq(self):
2201     """Check prerequisites.
2202
2203     This checks that the instance is in the cluster.
2204
2205     """
2206     instance = self.cfg.GetInstanceInfo(
2207       self.cfg.ExpandInstanceName(self.op.instance_name))
2208     if instance is None:
2209       raise errors.OpPrereqError("Instance '%s' not known" %
2210                                  self.op.instance_name)
2211     self.instance = instance
2212
2213   def Exec(self, feedback_fn):
2214     """Remove the instance.
2215
2216     """
2217     instance = self.instance
2218     logger.Info("shutting down instance %s on node %s" %
2219                 (instance.name, instance.primary_node))
2220
2221     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2222       raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2223                                (instance.name, instance.primary_node))
2224
2225     logger.Info("removing block devices for instance %s" % instance.name)
2226
2227     _RemoveDisks(instance, self.cfg)
2228
2229     logger.Info("removing instance %s out of cluster config" % instance.name)
2230
2231     self.cfg.RemoveInstance(instance.name)
2232
2233
2234 class LUQueryInstances(NoHooksLU):
2235   """Logical unit for querying instances.
2236
2237   """
2238   _OP_REQP = ["output_fields", "names"]
2239
2240   def CheckPrereq(self):
2241     """Check prerequisites.
2242
2243     This checks that the fields required are valid output fields.
2244
2245     """
2246     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2247     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2248                                "admin_state", "admin_ram",
2249                                "disk_template", "ip", "mac", "bridge",
2250                                "sda_size", "sdb_size"],
2251                        dynamic=self.dynamic_fields,
2252                        selected=self.op.output_fields)
2253
2254     self.wanted = _GetWantedInstances(self, self.op.names)
2255
2256   def Exec(self, feedback_fn):
2257     """Computes the list of nodes and their attributes.
2258
2259     """
2260     instance_names = self.wanted
2261     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2262                      in instance_names]
2263
2264     # begin data gathering
2265
2266     nodes = frozenset([inst.primary_node for inst in instance_list])
2267
2268     bad_nodes = []
2269     if self.dynamic_fields.intersection(self.op.output_fields):
2270       live_data = {}
2271       node_data = rpc.call_all_instances_info(nodes)
2272       for name in nodes:
2273         result = node_data[name]
2274         if result:
2275           live_data.update(result)
2276         elif result == False:
2277           bad_nodes.append(name)
2278         # else no instance is alive
2279     else:
2280       live_data = dict([(name, {}) for name in instance_names])
2281
2282     # end data gathering
2283
2284     output = []
2285     for instance in instance_list:
2286       iout = []
2287       for field in self.op.output_fields:
2288         if field == "name":
2289           val = instance.name
2290         elif field == "os":
2291           val = instance.os
2292         elif field == "pnode":
2293           val = instance.primary_node
2294         elif field == "snodes":
2295           val = list(instance.secondary_nodes)
2296         elif field == "admin_state":
2297           val = (instance.status != "down")
2298         elif field == "oper_state":
2299           if instance.primary_node in bad_nodes:
2300             val = None
2301           else:
2302             val = bool(live_data.get(instance.name))
2303         elif field == "admin_ram":
2304           val = instance.memory
2305         elif field == "oper_ram":
2306           if instance.primary_node in bad_nodes:
2307             val = None
2308           elif instance.name in live_data:
2309             val = live_data[instance.name].get("memory", "?")
2310           else:
2311             val = "-"
2312         elif field == "disk_template":
2313           val = instance.disk_template
2314         elif field == "ip":
2315           val = instance.nics[0].ip
2316         elif field == "bridge":
2317           val = instance.nics[0].bridge
2318         elif field == "mac":
2319           val = instance.nics[0].mac
2320         elif field == "sda_size" or field == "sdb_size":
2321           disk = instance.FindDisk(field[:3])
2322           if disk is None:
2323             val = None
2324           else:
2325             val = disk.size
2326         else:
2327           raise errors.ParameterError(field)
2328         iout.append(val)
2329       output.append(iout)
2330
2331     return output
2332
2333
2334 class LUFailoverInstance(LogicalUnit):
2335   """Failover an instance.
2336
2337   """
2338   HPATH = "instance-failover"
2339   HTYPE = constants.HTYPE_INSTANCE
2340   _OP_REQP = ["instance_name", "ignore_consistency"]
2341
2342   def BuildHooksEnv(self):
2343     """Build hooks env.
2344
2345     This runs on master, primary and secondary nodes of the instance.
2346
2347     """
2348     env = {
2349       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2350       }
2351     env.update(_BuildInstanceHookEnvByObject(self.instance))
2352     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2353     return env, nl, nl
2354
2355   def CheckPrereq(self):
2356     """Check prerequisites.
2357
2358     This checks that the instance is in the cluster.
2359
2360     """
2361     instance = self.cfg.GetInstanceInfo(
2362       self.cfg.ExpandInstanceName(self.op.instance_name))
2363     if instance is None:
2364       raise errors.OpPrereqError("Instance '%s' not known" %
2365                                  self.op.instance_name)
2366
2367     if instance.disk_template != constants.DT_REMOTE_RAID1:
2368       raise errors.OpPrereqError("Instance's disk layout is not"
2369                                  " remote_raid1.")
2370
2371     secondary_nodes = instance.secondary_nodes
2372     if not secondary_nodes:
2373       raise errors.ProgrammerError("no secondary node but using "
2374                                    "DT_REMOTE_RAID1 template")
2375
2376     # check memory requirements on the secondary node
2377     target_node = secondary_nodes[0]
2378     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2379     info = nodeinfo.get(target_node, None)
2380     if not info:
2381       raise errors.OpPrereqError("Cannot get current information"
2382                                  " from node '%s'" % nodeinfo)
2383     if instance.memory > info['memory_free']:
2384       raise errors.OpPrereqError("Not enough memory on target node %s."
2385                                  " %d MB available, %d MB required" %
2386                                  (target_node, info['memory_free'],
2387                                   instance.memory))
2388
2389     # check bridge existance
2390     brlist = [nic.bridge for nic in instance.nics]
2391     if not rpc.call_bridges_exist(instance.primary_node, brlist):
2392       raise errors.OpPrereqError("One or more target bridges %s does not"
2393                                  " exist on destination node '%s'" %
2394                                  (brlist, instance.primary_node))
2395
2396     self.instance = instance
2397
2398   def Exec(self, feedback_fn):
2399     """Failover an instance.
2400
2401     The failover is done by shutting it down on its present node and
2402     starting it on the secondary.
2403
2404     """
2405     instance = self.instance
2406
2407     source_node = instance.primary_node
2408     target_node = instance.secondary_nodes[0]
2409
2410     feedback_fn("* checking disk consistency between source and target")
2411     for dev in instance.disks:
2412       # for remote_raid1, these are md over drbd
2413       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2414         if not self.op.ignore_consistency:
2415           raise errors.OpExecError("Disk %s is degraded on target node,"
2416                                    " aborting failover." % dev.iv_name)
2417
2418     feedback_fn("* checking target node resource availability")
2419     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2420
2421     if not nodeinfo:
2422       raise errors.OpExecError("Could not contact target node %s." %
2423                                target_node)
2424
2425     free_memory = int(nodeinfo[target_node]['memory_free'])
2426     memory = instance.memory
2427     if memory > free_memory:
2428       raise errors.OpExecError("Not enough memory to create instance %s on"
2429                                " node %s. needed %s MiB, available %s MiB" %
2430                                (instance.name, target_node, memory,
2431                                 free_memory))
2432
2433     feedback_fn("* shutting down instance on source node")
2434     logger.Info("Shutting down instance %s on node %s" %
2435                 (instance.name, source_node))
2436
2437     if not rpc.call_instance_shutdown(source_node, instance):
2438       logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2439                    " anyway. Please make sure node %s is down"  %
2440                    (instance.name, source_node, source_node))
2441
2442     feedback_fn("* deactivating the instance's disks on source node")
2443     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2444       raise errors.OpExecError("Can't shut down the instance's disks.")
2445
2446     instance.primary_node = target_node
2447     # distribute new instance config to the other nodes
2448     self.cfg.AddInstance(instance)
2449
2450     feedback_fn("* activating the instance's disks on target node")
2451     logger.Info("Starting instance %s on node %s" %
2452                 (instance.name, target_node))
2453
2454     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2455                                              ignore_secondaries=True)
2456     if not disks_ok:
2457       _ShutdownInstanceDisks(instance, self.cfg)
2458       raise errors.OpExecError("Can't activate the instance's disks")
2459
2460     feedback_fn("* starting the instance on the target node")
2461     if not rpc.call_instance_start(target_node, instance, None):
2462       _ShutdownInstanceDisks(instance, self.cfg)
2463       raise errors.OpExecError("Could not start instance %s on node %s." %
2464                                (instance.name, target_node))
2465
2466
2467 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2468   """Create a tree of block devices on the primary node.
2469
2470   This always creates all devices.
2471
2472   """
2473   if device.children:
2474     for child in device.children:
2475       if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2476         return False
2477
2478   cfg.SetDiskID(device, node)
2479   new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2480   if not new_id:
2481     return False
2482   if device.physical_id is None:
2483     device.physical_id = new_id
2484   return True
2485
2486
2487 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2488   """Create a tree of block devices on a secondary node.
2489
2490   If this device type has to be created on secondaries, create it and
2491   all its children.
2492
2493   If not, just recurse to children keeping the same 'force' value.
2494
2495   """
2496   if device.CreateOnSecondary():
2497     force = True
2498   if device.children:
2499     for child in device.children:
2500       if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2501         return False
2502
2503   if not force:
2504     return True
2505   cfg.SetDiskID(device, node)
2506   new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2507   if not new_id:
2508     return False
2509   if device.physical_id is None:
2510     device.physical_id = new_id
2511   return True
2512
2513
2514 def _GenerateUniqueNames(cfg, exts):
2515   """Generate a suitable LV name.
2516
2517   This will generate a logical volume name for the given instance.
2518
2519   """
2520   results = []
2521   for val in exts:
2522     new_id = cfg.GenerateUniqueID()
2523     results.append("%s%s" % (new_id, val))
2524   return results
2525
2526
2527 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2528   """Generate a drbd device complete with its children.
2529
2530   """
2531   port = cfg.AllocatePort()
2532   vgname = cfg.GetVGName()
2533   dev_data = objects.Disk(dev_type="lvm", size=size,
2534                           logical_id=(vgname, names[0]))
2535   dev_meta = objects.Disk(dev_type="lvm", size=128,
2536                           logical_id=(vgname, names[1]))
2537   drbd_dev = objects.Disk(dev_type="drbd", size=size,
2538                           logical_id = (primary, secondary, port),
2539                           children = [dev_data, dev_meta])
2540   return drbd_dev
2541
2542
2543 def _GenerateDiskTemplate(cfg, template_name,
2544                           instance_name, primary_node,
2545                           secondary_nodes, disk_sz, swap_sz):
2546   """Generate the entire disk layout for a given template type.
2547
2548   """
2549   #TODO: compute space requirements
2550
2551   vgname = cfg.GetVGName()
2552   if template_name == "diskless":
2553     disks = []
2554   elif template_name == "plain":
2555     if len(secondary_nodes) != 0:
2556       raise errors.ProgrammerError("Wrong template configuration")
2557
2558     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2559     sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2560                            logical_id=(vgname, names[0]),
2561                            iv_name = "sda")
2562     sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2563                            logical_id=(vgname, names[1]),
2564                            iv_name = "sdb")
2565     disks = [sda_dev, sdb_dev]
2566   elif template_name == "local_raid1":
2567     if len(secondary_nodes) != 0:
2568       raise errors.ProgrammerError("Wrong template configuration")
2569
2570
2571     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2572                                        ".sdb_m1", ".sdb_m2"])
2573     sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2574                               logical_id=(vgname, names[0]))
2575     sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2576                               logical_id=(vgname, names[1]))
2577     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2578                               size=disk_sz,
2579                               children = [sda_dev_m1, sda_dev_m2])
2580     sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2581                               logical_id=(vgname, names[2]))
2582     sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2583                               logical_id=(vgname, names[3]))
2584     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2585                               size=swap_sz,
2586                               children = [sdb_dev_m1, sdb_dev_m2])
2587     disks = [md_sda_dev, md_sdb_dev]
2588   elif template_name == constants.DT_REMOTE_RAID1:
2589     if len(secondary_nodes) != 1:
2590       raise errors.ProgrammerError("Wrong template configuration")
2591     remote_node = secondary_nodes[0]
2592     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2593                                        ".sdb_data", ".sdb_meta"])
2594     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2595                                          disk_sz, names[0:2])
2596     md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2597                               children = [drbd_sda_dev], size=disk_sz)
2598     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2599                                          swap_sz, names[2:4])
2600     md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2601                               children = [drbd_sdb_dev], size=swap_sz)
2602     disks = [md_sda_dev, md_sdb_dev]
2603   else:
2604     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2605   return disks
2606
2607
2608 def _GetInstanceInfoText(instance):
2609   """Compute that text that should be added to the disk's metadata.
2610
2611   """
2612   return "originstname+%s" % instance.name
2613
2614
2615 def _CreateDisks(cfg, instance):
2616   """Create all disks for an instance.
2617
2618   This abstracts away some work from AddInstance.
2619
2620   Args:
2621     instance: the instance object
2622
2623   Returns:
2624     True or False showing the success of the creation process
2625
2626   """
2627   info = _GetInstanceInfoText(instance)
2628
2629   for device in instance.disks:
2630     logger.Info("creating volume %s for instance %s" %
2631               (device.iv_name, instance.name))
2632     #HARDCODE
2633     for secondary_node in instance.secondary_nodes:
2634       if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2635                                         info):
2636         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2637                      (device.iv_name, device, secondary_node))
2638         return False
2639     #HARDCODE
2640     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2641       logger.Error("failed to create volume %s on primary!" %
2642                    device.iv_name)
2643       return False
2644   return True
2645
2646
2647 def _RemoveDisks(instance, cfg):
2648   """Remove all disks for an instance.
2649
2650   This abstracts away some work from `AddInstance()` and
2651   `RemoveInstance()`. Note that in case some of the devices couldn't
2652   be remove, the removal will continue with the other ones (compare
2653   with `_CreateDisks()`).
2654
2655   Args:
2656     instance: the instance object
2657
2658   Returns:
2659     True or False showing the success of the removal proces
2660
2661   """
2662   logger.Info("removing block devices for instance %s" % instance.name)
2663
2664   result = True
2665   for device in instance.disks:
2666     for node, disk in device.ComputeNodeTree(instance.primary_node):
2667       cfg.SetDiskID(disk, node)
2668       if not rpc.call_blockdev_remove(node, disk):
2669         logger.Error("could not remove block device %s on node %s,"
2670                      " continuing anyway" %
2671                      (device.iv_name, node))
2672         result = False
2673   return result
2674
2675
2676 class LUCreateInstance(LogicalUnit):
2677   """Create an instance.
2678
2679   """
2680   HPATH = "instance-add"
2681   HTYPE = constants.HTYPE_INSTANCE
2682   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2683               "disk_template", "swap_size", "mode", "start", "vcpus",
2684               "wait_for_sync", "ip_check"]
2685
2686   def BuildHooksEnv(self):
2687     """Build hooks env.
2688
2689     This runs on master, primary and secondary nodes of the instance.
2690
2691     """
2692     env = {
2693       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2694       "INSTANCE_DISK_SIZE": self.op.disk_size,
2695       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2696       "INSTANCE_ADD_MODE": self.op.mode,
2697       }
2698     if self.op.mode == constants.INSTANCE_IMPORT:
2699       env["INSTANCE_SRC_NODE"] = self.op.src_node
2700       env["INSTANCE_SRC_PATH"] = self.op.src_path
2701       env["INSTANCE_SRC_IMAGE"] = self.src_image
2702
2703     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2704       primary_node=self.op.pnode,
2705       secondary_nodes=self.secondaries,
2706       status=self.instance_status,
2707       os_type=self.op.os_type,
2708       memory=self.op.mem_size,
2709       vcpus=self.op.vcpus,
2710       nics=[(self.inst_ip, self.op.bridge)],
2711     ))
2712
2713     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2714           self.secondaries)
2715     return env, nl, nl
2716
2717
2718   def CheckPrereq(self):
2719     """Check prerequisites.
2720
2721     """
2722     if self.op.mode not in (constants.INSTANCE_CREATE,
2723                             constants.INSTANCE_IMPORT):
2724       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2725                                  self.op.mode)
2726
2727     if self.op.mode == constants.INSTANCE_IMPORT:
2728       src_node = getattr(self.op, "src_node", None)
2729       src_path = getattr(self.op, "src_path", None)
2730       if src_node is None or src_path is None:
2731         raise errors.OpPrereqError("Importing an instance requires source"
2732                                    " node and path options")
2733       src_node_full = self.cfg.ExpandNodeName(src_node)
2734       if src_node_full is None:
2735         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2736       self.op.src_node = src_node = src_node_full
2737
2738       if not os.path.isabs(src_path):
2739         raise errors.OpPrereqError("The source path must be absolute")
2740
2741       export_info = rpc.call_export_info(src_node, src_path)
2742
2743       if not export_info:
2744         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2745
2746       if not export_info.has_section(constants.INISECT_EXP):
2747         raise errors.ProgrammerError("Corrupted export config")
2748
2749       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2750       if (int(ei_version) != constants.EXPORT_VERSION):
2751         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2752                                    (ei_version, constants.EXPORT_VERSION))
2753
2754       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2755         raise errors.OpPrereqError("Can't import instance with more than"
2756                                    " one data disk")
2757
2758       # FIXME: are the old os-es, disk sizes, etc. useful?
2759       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2760       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2761                                                          'disk0_dump'))
2762       self.src_image = diskimage
2763     else: # INSTANCE_CREATE
2764       if getattr(self.op, "os_type", None) is None:
2765         raise errors.OpPrereqError("No guest OS specified")
2766
2767     # check primary node
2768     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2769     if pnode is None:
2770       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2771                                  self.op.pnode)
2772     self.op.pnode = pnode.name
2773     self.pnode = pnode
2774     self.secondaries = []
2775     # disk template and mirror node verification
2776     if self.op.disk_template not in constants.DISK_TEMPLATES:
2777       raise errors.OpPrereqError("Invalid disk template name")
2778
2779     if self.op.disk_template == constants.DT_REMOTE_RAID1:
2780       if getattr(self.op, "snode", None) is None:
2781         raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2782                                    " a mirror node")
2783
2784       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2785       if snode_name is None:
2786         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2787                                    self.op.snode)
2788       elif snode_name == pnode.name:
2789         raise errors.OpPrereqError("The secondary node cannot be"
2790                                    " the primary node.")
2791       self.secondaries.append(snode_name)
2792
2793     # Check lv size requirements
2794     nodenames = [pnode.name] + self.secondaries
2795     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2796
2797     # Required free disk space as a function of disk and swap space
2798     req_size_dict = {
2799       constants.DT_DISKLESS: 0,
2800       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2801       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2802       # 256 MB are added for drbd metadata, 128MB for each drbd device
2803       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2804     }
2805
2806     if self.op.disk_template not in req_size_dict:
2807       raise errors.ProgrammerError("Disk template '%s' size requirement"
2808                                    " is unknown" %  self.op.disk_template)
2809
2810     req_size = req_size_dict[self.op.disk_template]
2811
2812     for node in nodenames:
2813       info = nodeinfo.get(node, None)
2814       if not info:
2815         raise errors.OpPrereqError("Cannot get current information"
2816                                    " from node '%s'" % nodeinfo)
2817       if req_size > info['vg_free']:
2818         raise errors.OpPrereqError("Not enough disk space on target node %s."
2819                                    " %d MB available, %d MB required" %
2820                                    (node, info['vg_free'], req_size))
2821
2822     # os verification
2823     os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2824     if not isinstance(os_obj, objects.OS):
2825       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2826                                  " primary node"  % self.op.os_type)
2827
2828     # instance verification
2829     hostname1 = utils.HostInfo(self.op.instance_name)
2830
2831     self.op.instance_name = instance_name = hostname1.name
2832     instance_list = self.cfg.GetInstanceList()
2833     if instance_name in instance_list:
2834       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2835                                  instance_name)
2836
2837     ip = getattr(self.op, "ip", None)
2838     if ip is None or ip.lower() == "none":
2839       inst_ip = None
2840     elif ip.lower() == "auto":
2841       inst_ip = hostname1.ip
2842     else:
2843       if not utils.IsValidIP(ip):
2844         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2845                                    " like a valid IP" % ip)
2846       inst_ip = ip
2847     self.inst_ip = inst_ip
2848
2849     if self.op.start and not self.op.ip_check:
2850       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2851                                  " adding an instance in start mode")
2852
2853     if self.op.ip_check:
2854       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2855                        constants.DEFAULT_NODED_PORT):
2856         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2857                                    (hostname1.ip, instance_name))
2858
2859     # bridge verification
2860     bridge = getattr(self.op, "bridge", None)
2861     if bridge is None:
2862       self.op.bridge = self.cfg.GetDefBridge()
2863     else:
2864       self.op.bridge = bridge
2865
2866     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2867       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2868                                  " destination node '%s'" %
2869                                  (self.op.bridge, pnode.name))
2870
2871     if self.op.start:
2872       self.instance_status = 'up'
2873     else:
2874       self.instance_status = 'down'
2875
2876   def Exec(self, feedback_fn):
2877     """Create and add the instance to the cluster.
2878
2879     """
2880     instance = self.op.instance_name
2881     pnode_name = self.pnode.name
2882
2883     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2884     if self.inst_ip is not None:
2885       nic.ip = self.inst_ip
2886
2887     disks = _GenerateDiskTemplate(self.cfg,
2888                                   self.op.disk_template,
2889                                   instance, pnode_name,
2890                                   self.secondaries, self.op.disk_size,
2891                                   self.op.swap_size)
2892
2893     iobj = objects.Instance(name=instance, os=self.op.os_type,
2894                             primary_node=pnode_name,
2895                             memory=self.op.mem_size,
2896                             vcpus=self.op.vcpus,
2897                             nics=[nic], disks=disks,
2898                             disk_template=self.op.disk_template,
2899                             status=self.instance_status,
2900                             )
2901
2902     feedback_fn("* creating instance disks...")
2903     if not _CreateDisks(self.cfg, iobj):
2904       _RemoveDisks(iobj, self.cfg)
2905       raise errors.OpExecError("Device creation failed, reverting...")
2906
2907     feedback_fn("adding instance %s to cluster config" % instance)
2908
2909     self.cfg.AddInstance(iobj)
2910
2911     if self.op.wait_for_sync:
2912       disk_abort = not _WaitForSync(self.cfg, iobj)
2913     elif iobj.disk_template == constants.DT_REMOTE_RAID1:
2914       # make sure the disks are not degraded (still sync-ing is ok)
2915       time.sleep(15)
2916       feedback_fn("* checking mirrors status")
2917       disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2918     else:
2919       disk_abort = False
2920
2921     if disk_abort:
2922       _RemoveDisks(iobj, self.cfg)
2923       self.cfg.RemoveInstance(iobj.name)
2924       raise errors.OpExecError("There are some degraded disks for"
2925                                " this instance")
2926
2927     feedback_fn("creating os for instance %s on node %s" %
2928                 (instance, pnode_name))
2929
2930     if iobj.disk_template != constants.DT_DISKLESS:
2931       if self.op.mode == constants.INSTANCE_CREATE:
2932         feedback_fn("* running the instance OS create scripts...")
2933         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2934           raise errors.OpExecError("could not add os for instance %s"
2935                                    " on node %s" %
2936                                    (instance, pnode_name))
2937
2938       elif self.op.mode == constants.INSTANCE_IMPORT:
2939         feedback_fn("* running the instance OS import scripts...")
2940         src_node = self.op.src_node
2941         src_image = self.src_image
2942         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2943                                                 src_node, src_image):
2944           raise errors.OpExecError("Could not import os for instance"
2945                                    " %s on node %s" %
2946                                    (instance, pnode_name))
2947       else:
2948         # also checked in the prereq part
2949         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2950                                      % self.op.mode)
2951
2952     if self.op.start:
2953       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2954       feedback_fn("* starting instance...")
2955       if not rpc.call_instance_start(pnode_name, iobj, None):
2956         raise errors.OpExecError("Could not start instance")
2957
2958
2959 class LUConnectConsole(NoHooksLU):
2960   """Connect to an instance's console.
2961
2962   This is somewhat special in that it returns the command line that
2963   you need to run on the master node in order to connect to the
2964   console.
2965
2966   """
2967   _OP_REQP = ["instance_name"]
2968
2969   def CheckPrereq(self):
2970     """Check prerequisites.
2971
2972     This checks that the instance is in the cluster.
2973
2974     """
2975     instance = self.cfg.GetInstanceInfo(
2976       self.cfg.ExpandInstanceName(self.op.instance_name))
2977     if instance is None:
2978       raise errors.OpPrereqError("Instance '%s' not known" %
2979                                  self.op.instance_name)
2980     self.instance = instance
2981
2982   def Exec(self, feedback_fn):
2983     """Connect to the console of an instance
2984
2985     """
2986     instance = self.instance
2987     node = instance.primary_node
2988
2989     node_insts = rpc.call_instance_list([node])[node]
2990     if node_insts is False:
2991       raise errors.OpExecError("Can't connect to node %s." % node)
2992
2993     if instance.name not in node_insts:
2994       raise errors.OpExecError("Instance %s is not running." % instance.name)
2995
2996     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2997
2998     hyper = hypervisor.GetHypervisor()
2999     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3000     # build ssh cmdline
3001     argv = ["ssh", "-q", "-t"]
3002     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3003     argv.extend(ssh.BATCH_MODE_OPTS)
3004     argv.append(node)
3005     argv.append(console_cmd)
3006     return "ssh", argv
3007
3008
3009 class LUAddMDDRBDComponent(LogicalUnit):
3010   """Adda new mirror member to an instance's disk.
3011
3012   """
3013   HPATH = "mirror-add"
3014   HTYPE = constants.HTYPE_INSTANCE
3015   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3016
3017   def BuildHooksEnv(self):
3018     """Build hooks env.
3019
3020     This runs on the master, the primary and all the secondaries.
3021
3022     """
3023     env = {
3024       "NEW_SECONDARY": self.op.remote_node,
3025       "DISK_NAME": self.op.disk_name,
3026       }
3027     env.update(_BuildInstanceHookEnvByObject(self.instance))
3028     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3029           self.op.remote_node,] + list(self.instance.secondary_nodes)
3030     return env, nl, nl
3031
3032   def CheckPrereq(self):
3033     """Check prerequisites.
3034
3035     This checks that the instance is in the cluster.
3036
3037     """
3038     instance = self.cfg.GetInstanceInfo(
3039       self.cfg.ExpandInstanceName(self.op.instance_name))
3040     if instance is None:
3041       raise errors.OpPrereqError("Instance '%s' not known" %
3042                                  self.op.instance_name)
3043     self.instance = instance
3044
3045     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3046     if remote_node is None:
3047       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3048     self.remote_node = remote_node
3049
3050     if remote_node == instance.primary_node:
3051       raise errors.OpPrereqError("The specified node is the primary node of"
3052                                  " the instance.")
3053
3054     if instance.disk_template != constants.DT_REMOTE_RAID1:
3055       raise errors.OpPrereqError("Instance's disk layout is not"
3056                                  " remote_raid1.")
3057     for disk in instance.disks:
3058       if disk.iv_name == self.op.disk_name:
3059         break
3060     else:
3061       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3062                                  " instance." % self.op.disk_name)
3063     if len(disk.children) > 1:
3064       raise errors.OpPrereqError("The device already has two slave"
3065                                  " devices.\n"
3066                                  "This would create a 3-disk raid1"
3067                                  " which we don't allow.")
3068     self.disk = disk
3069
3070   def Exec(self, feedback_fn):
3071     """Add the mirror component
3072
3073     """
3074     disk = self.disk
3075     instance = self.instance
3076
3077     remote_node = self.remote_node
3078     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3079     names = _GenerateUniqueNames(self.cfg, lv_names)
3080     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3081                                      remote_node, disk.size, names)
3082
3083     logger.Info("adding new mirror component on secondary")
3084     #HARDCODE
3085     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
3086                                       _GetInstanceInfoText(instance)):
3087       raise errors.OpExecError("Failed to create new component on secondary"
3088                                " node %s" % remote_node)
3089
3090     logger.Info("adding new mirror component on primary")
3091     #HARDCODE
3092     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
3093                                     _GetInstanceInfoText(instance)):
3094       # remove secondary dev
3095       self.cfg.SetDiskID(new_drbd, remote_node)
3096       rpc.call_blockdev_remove(remote_node, new_drbd)
3097       raise errors.OpExecError("Failed to create volume on primary")
3098
3099     # the device exists now
3100     # call the primary node to add the mirror to md
3101     logger.Info("adding new mirror component to md")
3102     if not rpc.call_blockdev_addchild(instance.primary_node,
3103                                            disk, new_drbd):
3104       logger.Error("Can't add mirror compoment to md!")
3105       self.cfg.SetDiskID(new_drbd, remote_node)
3106       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3107         logger.Error("Can't rollback on secondary")
3108       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3109       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3110         logger.Error("Can't rollback on primary")
3111       raise errors.OpExecError("Can't add mirror component to md array")
3112
3113     disk.children.append(new_drbd)
3114
3115     self.cfg.AddInstance(instance)
3116
3117     _WaitForSync(self.cfg, instance)
3118
3119     return 0
3120
3121
3122 class LURemoveMDDRBDComponent(LogicalUnit):
3123   """Remove a component from a remote_raid1 disk.
3124
3125   """
3126   HPATH = "mirror-remove"
3127   HTYPE = constants.HTYPE_INSTANCE
3128   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3129
3130   def BuildHooksEnv(self):
3131     """Build hooks env.
3132
3133     This runs on the master, the primary and all the secondaries.
3134
3135     """
3136     env = {
3137       "DISK_NAME": self.op.disk_name,
3138       "DISK_ID": self.op.disk_id,
3139       "OLD_SECONDARY": self.old_secondary,
3140       }
3141     env.update(_BuildInstanceHookEnvByObject(self.instance))
3142     nl = [self.sstore.GetMasterNode(),
3143           self.instance.primary_node] + list(self.instance.secondary_nodes)
3144     return env, nl, nl
3145
3146   def CheckPrereq(self):
3147     """Check prerequisites.
3148
3149     This checks that the instance is in the cluster.
3150
3151     """
3152     instance = self.cfg.GetInstanceInfo(
3153       self.cfg.ExpandInstanceName(self.op.instance_name))
3154     if instance is None:
3155       raise errors.OpPrereqError("Instance '%s' not known" %
3156                                  self.op.instance_name)
3157     self.instance = instance
3158
3159     if instance.disk_template != constants.DT_REMOTE_RAID1:
3160       raise errors.OpPrereqError("Instance's disk layout is not"
3161                                  " remote_raid1.")
3162     for disk in instance.disks:
3163       if disk.iv_name == self.op.disk_name:
3164         break
3165     else:
3166       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3167                                  " instance." % self.op.disk_name)
3168     for child in disk.children:
3169       if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
3170         break
3171     else:
3172       raise errors.OpPrereqError("Can't find the device with this port.")
3173
3174     if len(disk.children) < 2:
3175       raise errors.OpPrereqError("Cannot remove the last component from"
3176                                  " a mirror.")
3177     self.disk = disk
3178     self.child = child
3179     if self.child.logical_id[0] == instance.primary_node:
3180       oid = 1
3181     else:
3182       oid = 0
3183     self.old_secondary = self.child.logical_id[oid]
3184
3185   def Exec(self, feedback_fn):
3186     """Remove the mirror component
3187
3188     """
3189     instance = self.instance
3190     disk = self.disk
3191     child = self.child
3192     logger.Info("remove mirror component")
3193     self.cfg.SetDiskID(disk, instance.primary_node)
3194     if not rpc.call_blockdev_removechild(instance.primary_node,
3195                                               disk, child):
3196       raise errors.OpExecError("Can't remove child from mirror.")
3197
3198     for node in child.logical_id[:2]:
3199       self.cfg.SetDiskID(child, node)
3200       if not rpc.call_blockdev_remove(node, child):
3201         logger.Error("Warning: failed to remove device from node %s,"
3202                      " continuing operation." % node)
3203
3204     disk.children.remove(child)
3205     self.cfg.AddInstance(instance)
3206
3207
3208 class LUReplaceDisks(LogicalUnit):
3209   """Replace the disks of an instance.
3210
3211   """
3212   HPATH = "mirrors-replace"
3213   HTYPE = constants.HTYPE_INSTANCE
3214   _OP_REQP = ["instance_name"]
3215
3216   def BuildHooksEnv(self):
3217     """Build hooks env.
3218
3219     This runs on the master, the primary and all the secondaries.
3220
3221     """
3222     env = {
3223       "NEW_SECONDARY": self.op.remote_node,
3224       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3225       }
3226     env.update(_BuildInstanceHookEnvByObject(self.instance))
3227     nl = [self.sstore.GetMasterNode(),
3228           self.instance.primary_node] + list(self.instance.secondary_nodes)
3229     return env, nl, nl
3230
3231   def CheckPrereq(self):
3232     """Check prerequisites.
3233
3234     This checks that the instance is in the cluster.
3235
3236     """
3237     instance = self.cfg.GetInstanceInfo(
3238       self.cfg.ExpandInstanceName(self.op.instance_name))
3239     if instance is None:
3240       raise errors.OpPrereqError("Instance '%s' not known" %
3241                                  self.op.instance_name)
3242     self.instance = instance
3243
3244     if instance.disk_template != constants.DT_REMOTE_RAID1:
3245       raise errors.OpPrereqError("Instance's disk layout is not"
3246                                  " remote_raid1.")
3247
3248     if len(instance.secondary_nodes) != 1:
3249       raise errors.OpPrereqError("The instance has a strange layout,"
3250                                  " expected one secondary but found %d" %
3251                                  len(instance.secondary_nodes))
3252
3253     remote_node = getattr(self.op, "remote_node", None)
3254     if remote_node is None:
3255       remote_node = instance.secondary_nodes[0]
3256     else:
3257       remote_node = self.cfg.ExpandNodeName(remote_node)
3258       if remote_node is None:
3259         raise errors.OpPrereqError("Node '%s' not known" %
3260                                    self.op.remote_node)
3261     if remote_node == instance.primary_node:
3262       raise errors.OpPrereqError("The specified node is the primary node of"
3263                                  " the instance.")
3264     self.op.remote_node = remote_node
3265
3266   def Exec(self, feedback_fn):
3267     """Replace the disks of an instance.
3268
3269     """
3270     instance = self.instance
3271     iv_names = {}
3272     # start of work
3273     remote_node = self.op.remote_node
3274     cfg = self.cfg
3275     for dev in instance.disks:
3276       size = dev.size
3277       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3278       names = _GenerateUniqueNames(cfg, lv_names)
3279       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3280                                        remote_node, size, names)
3281       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3282       logger.Info("adding new mirror component on secondary for %s" %
3283                   dev.iv_name)
3284       #HARDCODE
3285       if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3286                                         _GetInstanceInfoText(instance)):
3287         raise errors.OpExecError("Failed to create new component on"
3288                                  " secondary node %s\n"
3289                                  "Full abort, cleanup manually!" %
3290                                  remote_node)
3291
3292       logger.Info("adding new mirror component on primary")
3293       #HARDCODE
3294       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3295                                       _GetInstanceInfoText(instance)):
3296         # remove secondary dev
3297         cfg.SetDiskID(new_drbd, remote_node)
3298         rpc.call_blockdev_remove(remote_node, new_drbd)
3299         raise errors.OpExecError("Failed to create volume on primary!\n"
3300                                  "Full abort, cleanup manually!!")
3301
3302       # the device exists now
3303       # call the primary node to add the mirror to md
3304       logger.Info("adding new mirror component to md")
3305       if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3306                                         new_drbd):
3307         logger.Error("Can't add mirror compoment to md!")
3308         cfg.SetDiskID(new_drbd, remote_node)
3309         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3310           logger.Error("Can't rollback on secondary")
3311         cfg.SetDiskID(new_drbd, instance.primary_node)
3312         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3313           logger.Error("Can't rollback on primary")
3314         raise errors.OpExecError("Full abort, cleanup manually!!")
3315
3316       dev.children.append(new_drbd)
3317       cfg.AddInstance(instance)
3318
3319     # this can fail as the old devices are degraded and _WaitForSync
3320     # does a combined result over all disks, so we don't check its
3321     # return value
3322     _WaitForSync(cfg, instance, unlock=True)
3323
3324     # so check manually all the devices
3325     for name in iv_names:
3326       dev, child, new_drbd = iv_names[name]
3327       cfg.SetDiskID(dev, instance.primary_node)
3328       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3329       if is_degr:
3330         raise errors.OpExecError("MD device %s is degraded!" % name)
3331       cfg.SetDiskID(new_drbd, instance.primary_node)
3332       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3333       if is_degr:
3334         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3335
3336     for name in iv_names:
3337       dev, child, new_drbd = iv_names[name]
3338       logger.Info("remove mirror %s component" % name)
3339       cfg.SetDiskID(dev, instance.primary_node)
3340       if not rpc.call_blockdev_removechild(instance.primary_node,
3341                                                 dev, child):
3342         logger.Error("Can't remove child from mirror, aborting"
3343                      " *this device cleanup*.\nYou need to cleanup manually!!")
3344         continue
3345
3346       for node in child.logical_id[:2]:
3347         logger.Info("remove child device on %s" % node)
3348         cfg.SetDiskID(child, node)
3349         if not rpc.call_blockdev_remove(node, child):
3350           logger.Error("Warning: failed to remove device from node %s,"
3351                        " continuing operation." % node)
3352
3353       dev.children.remove(child)
3354
3355       cfg.AddInstance(instance)
3356
3357
3358 class LUQueryInstanceData(NoHooksLU):
3359   """Query runtime instance data.
3360
3361   """
3362   _OP_REQP = ["instances"]
3363
3364   def CheckPrereq(self):
3365     """Check prerequisites.
3366
3367     This only checks the optional instance list against the existing names.
3368
3369     """
3370     if not isinstance(self.op.instances, list):
3371       raise errors.OpPrereqError("Invalid argument type 'instances'")
3372     if self.op.instances:
3373       self.wanted_instances = []
3374       names = self.op.instances
3375       for name in names:
3376         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3377         if instance is None:
3378           raise errors.OpPrereqError("No such instance name '%s'" % name)
3379       self.wanted_instances.append(instance)
3380     else:
3381       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3382                                in self.cfg.GetInstanceList()]
3383     return
3384
3385
3386   def _ComputeDiskStatus(self, instance, snode, dev):
3387     """Compute block device status.
3388
3389     """
3390     self.cfg.SetDiskID(dev, instance.primary_node)
3391     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3392     if dev.dev_type == "drbd":
3393       # we change the snode then (otherwise we use the one passed in)
3394       if dev.logical_id[0] == instance.primary_node:
3395         snode = dev.logical_id[1]
3396       else:
3397         snode = dev.logical_id[0]
3398
3399     if snode:
3400       self.cfg.SetDiskID(dev, snode)
3401       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3402     else:
3403       dev_sstatus = None
3404
3405     if dev.children:
3406       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3407                       for child in dev.children]
3408     else:
3409       dev_children = []
3410
3411     data = {
3412       "iv_name": dev.iv_name,
3413       "dev_type": dev.dev_type,
3414       "logical_id": dev.logical_id,
3415       "physical_id": dev.physical_id,
3416       "pstatus": dev_pstatus,
3417       "sstatus": dev_sstatus,
3418       "children": dev_children,
3419       }
3420
3421     return data
3422
3423   def Exec(self, feedback_fn):
3424     """Gather and return data"""
3425     result = {}
3426     for instance in self.wanted_instances:
3427       remote_info = rpc.call_instance_info(instance.primary_node,
3428                                                 instance.name)
3429       if remote_info and "state" in remote_info:
3430         remote_state = "up"
3431       else:
3432         remote_state = "down"
3433       if instance.status == "down":
3434         config_state = "down"
3435       else:
3436         config_state = "up"
3437
3438       disks = [self._ComputeDiskStatus(instance, None, device)
3439                for device in instance.disks]
3440
3441       idict = {
3442         "name": instance.name,
3443         "config_state": config_state,
3444         "run_state": remote_state,
3445         "pnode": instance.primary_node,
3446         "snodes": instance.secondary_nodes,
3447         "os": instance.os,
3448         "memory": instance.memory,
3449         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3450         "disks": disks,
3451         }
3452
3453       result[instance.name] = idict
3454
3455     return result
3456
3457
3458 class LUSetInstanceParms(LogicalUnit):
3459   """Modifies an instances's parameters.
3460
3461   """
3462   HPATH = "instance-modify"
3463   HTYPE = constants.HTYPE_INSTANCE
3464   _OP_REQP = ["instance_name"]
3465
3466   def BuildHooksEnv(self):
3467     """Build hooks env.
3468
3469     This runs on the master, primary and secondaries.
3470
3471     """
3472     args = dict()
3473     if self.mem:
3474       args['memory'] = self.mem
3475     if self.vcpus:
3476       args['vcpus'] = self.vcpus
3477     if self.do_ip or self.do_bridge:
3478       if self.do_ip:
3479         ip = self.ip
3480       else:
3481         ip = self.instance.nics[0].ip
3482       if self.bridge:
3483         bridge = self.bridge
3484       else:
3485         bridge = self.instance.nics[0].bridge
3486       args['nics'] = [(ip, bridge)]
3487     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3488     nl = [self.sstore.GetMasterNode(),
3489           self.instance.primary_node] + list(self.instance.secondary_nodes)
3490     return env, nl, nl
3491
3492   def CheckPrereq(self):
3493     """Check prerequisites.
3494
3495     This only checks the instance list against the existing names.
3496
3497     """
3498     self.mem = getattr(self.op, "mem", None)
3499     self.vcpus = getattr(self.op, "vcpus", None)
3500     self.ip = getattr(self.op, "ip", None)
3501     self.bridge = getattr(self.op, "bridge", None)
3502     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3503       raise errors.OpPrereqError("No changes submitted")
3504     if self.mem is not None:
3505       try:
3506         self.mem = int(self.mem)
3507       except ValueError, err:
3508         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3509     if self.vcpus is not None:
3510       try:
3511         self.vcpus = int(self.vcpus)
3512       except ValueError, err:
3513         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3514     if self.ip is not None:
3515       self.do_ip = True
3516       if self.ip.lower() == "none":
3517         self.ip = None
3518       else:
3519         if not utils.IsValidIP(self.ip):
3520           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3521     else:
3522       self.do_ip = False
3523     self.do_bridge = (self.bridge is not None)
3524
3525     instance = self.cfg.GetInstanceInfo(
3526       self.cfg.ExpandInstanceName(self.op.instance_name))
3527     if instance is None:
3528       raise errors.OpPrereqError("No such instance name '%s'" %
3529                                  self.op.instance_name)
3530     self.op.instance_name = instance.name
3531     self.instance = instance
3532     return
3533
3534   def Exec(self, feedback_fn):
3535     """Modifies an instance.
3536
3537     All parameters take effect only at the next restart of the instance.
3538     """
3539     result = []
3540     instance = self.instance
3541     if self.mem:
3542       instance.memory = self.mem
3543       result.append(("mem", self.mem))
3544     if self.vcpus:
3545       instance.vcpus = self.vcpus
3546       result.append(("vcpus",  self.vcpus))
3547     if self.do_ip:
3548       instance.nics[0].ip = self.ip
3549       result.append(("ip", self.ip))
3550     if self.bridge:
3551       instance.nics[0].bridge = self.bridge
3552       result.append(("bridge", self.bridge))
3553
3554     self.cfg.AddInstance(instance)
3555
3556     return result
3557
3558
3559 class LUQueryExports(NoHooksLU):
3560   """Query the exports list
3561
3562   """
3563   _OP_REQP = []
3564
3565   def CheckPrereq(self):
3566     """Check that the nodelist contains only existing nodes.
3567
3568     """
3569     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3570
3571   def Exec(self, feedback_fn):
3572     """Compute the list of all the exported system images.
3573
3574     Returns:
3575       a dictionary with the structure node->(export-list)
3576       where export-list is a list of the instances exported on
3577       that node.
3578
3579     """
3580     return rpc.call_export_list(self.nodes)
3581
3582
3583 class LUExportInstance(LogicalUnit):
3584   """Export an instance to an image in the cluster.
3585
3586   """
3587   HPATH = "instance-export"
3588   HTYPE = constants.HTYPE_INSTANCE
3589   _OP_REQP = ["instance_name", "target_node", "shutdown"]
3590
3591   def BuildHooksEnv(self):
3592     """Build hooks env.
3593
3594     This will run on the master, primary node and target node.
3595
3596     """
3597     env = {
3598       "EXPORT_NODE": self.op.target_node,
3599       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3600       }
3601     env.update(_BuildInstanceHookEnvByObject(self.instance))
3602     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3603           self.op.target_node]
3604     return env, nl, nl
3605
3606   def CheckPrereq(self):
3607     """Check prerequisites.
3608
3609     This checks that the instance name is a valid one.
3610
3611     """
3612     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3613     self.instance = self.cfg.GetInstanceInfo(instance_name)
3614     if self.instance is None:
3615       raise errors.OpPrereqError("Instance '%s' not found" %
3616                                  self.op.instance_name)
3617
3618     # node verification
3619     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3620     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3621
3622     if self.dst_node is None:
3623       raise errors.OpPrereqError("Destination node '%s' is unknown." %
3624                                  self.op.target_node)
3625     self.op.target_node = self.dst_node.name
3626
3627   def Exec(self, feedback_fn):
3628     """Export an instance to an image in the cluster.
3629
3630     """
3631     instance = self.instance
3632     dst_node = self.dst_node
3633     src_node = instance.primary_node
3634     # shutdown the instance, unless requested not to do so
3635     if self.op.shutdown:
3636       op = opcodes.OpShutdownInstance(instance_name=instance.name)
3637       self.processor.ChainOpCode(op, feedback_fn)
3638
3639     vgname = self.cfg.GetVGName()
3640
3641     snap_disks = []
3642
3643     try:
3644       for disk in instance.disks:
3645         if disk.iv_name == "sda":
3646           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3647           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3648
3649           if not new_dev_name:
3650             logger.Error("could not snapshot block device %s on node %s" %
3651                          (disk.logical_id[1], src_node))
3652           else:
3653             new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3654                                       logical_id=(vgname, new_dev_name),
3655                                       physical_id=(vgname, new_dev_name),
3656                                       iv_name=disk.iv_name)
3657             snap_disks.append(new_dev)
3658
3659     finally:
3660       if self.op.shutdown:
3661         op = opcodes.OpStartupInstance(instance_name=instance.name,
3662                                        force=False)
3663         self.processor.ChainOpCode(op, feedback_fn)
3664
3665     # TODO: check for size
3666
3667     for dev in snap_disks:
3668       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3669                                            instance):
3670         logger.Error("could not export block device %s from node"
3671                      " %s to node %s" %
3672                      (dev.logical_id[1], src_node, dst_node.name))
3673       if not rpc.call_blockdev_remove(src_node, dev):
3674         logger.Error("could not remove snapshot block device %s from"
3675                      " node %s" % (dev.logical_id[1], src_node))
3676
3677     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3678       logger.Error("could not finalize export for instance %s on node %s" %
3679                    (instance.name, dst_node.name))
3680
3681     nodelist = self.cfg.GetNodeList()
3682     nodelist.remove(dst_node.name)
3683
3684     # on one-node clusters nodelist will be empty after the removal
3685     # if we proceed the backup would be removed because OpQueryExports
3686     # substitutes an empty list with the full cluster node list.
3687     if nodelist:
3688       op = opcodes.OpQueryExports(nodes=nodelist)
3689       exportlist = self.processor.ChainOpCode(op, feedback_fn)
3690       for node in exportlist:
3691         if instance.name in exportlist[node]:
3692           if not rpc.call_export_remove(node, instance.name):
3693             logger.Error("could not remove older export for instance %s"
3694                          " on node %s" % (instance.name, node))
3695
3696
3697 class TagsLU(NoHooksLU):
3698   """Generic tags LU.
3699
3700   This is an abstract class which is the parent of all the other tags LUs.
3701
3702   """
3703   def CheckPrereq(self):
3704     """Check prerequisites.
3705
3706     """
3707     if self.op.kind == constants.TAG_CLUSTER:
3708       self.target = self.cfg.GetClusterInfo()
3709     elif self.op.kind == constants.TAG_NODE:
3710       name = self.cfg.ExpandNodeName(self.op.name)
3711       if name is None:
3712         raise errors.OpPrereqError("Invalid node name (%s)" %
3713                                    (self.op.name,))
3714       self.op.name = name
3715       self.target = self.cfg.GetNodeInfo(name)
3716     elif self.op.kind == constants.TAG_INSTANCE:
3717       name = self.cfg.ExpandInstanceName(self.op.name)
3718       if name is None:
3719         raise errors.OpPrereqError("Invalid instance name (%s)" %
3720                                    (self.op.name,))
3721       self.op.name = name
3722       self.target = self.cfg.GetInstanceInfo(name)
3723     else:
3724       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3725                                  str(self.op.kind))
3726
3727
3728 class LUGetTags(TagsLU):
3729   """Returns the tags of a given object.
3730
3731   """
3732   _OP_REQP = ["kind", "name"]
3733
3734   def Exec(self, feedback_fn):
3735     """Returns the tag list.
3736
3737     """
3738     return self.target.GetTags()
3739
3740
3741 class LUAddTags(TagsLU):
3742   """Sets a tag on a given object.
3743
3744   """
3745   _OP_REQP = ["kind", "name", "tags"]
3746
3747   def CheckPrereq(self):
3748     """Check prerequisites.
3749
3750     This checks the type and length of the tag name and value.
3751
3752     """
3753     TagsLU.CheckPrereq(self)
3754     for tag in self.op.tags:
3755       objects.TaggableObject.ValidateTag(tag)
3756
3757   def Exec(self, feedback_fn):
3758     """Sets the tag.
3759
3760     """
3761     try:
3762       for tag in self.op.tags:
3763         self.target.AddTag(tag)
3764     except errors.TagError, err:
3765       raise errors.OpExecError("Error while setting tag: %s" % str(err))
3766     try:
3767       self.cfg.Update(self.target)
3768     except errors.ConfigurationError:
3769       raise errors.OpRetryError("There has been a modification to the"
3770                                 " config file and the operation has been"
3771                                 " aborted. Please retry.")
3772
3773
3774 class LUDelTags(TagsLU):
3775   """Delete a list of tags from a given object.
3776
3777   """
3778   _OP_REQP = ["kind", "name", "tags"]
3779
3780   def CheckPrereq(self):
3781     """Check prerequisites.
3782
3783     This checks that we have the given tag.
3784
3785     """
3786     TagsLU.CheckPrereq(self)
3787     for tag in self.op.tags:
3788       objects.TaggableObject.ValidateTag(tag)
3789     del_tags = frozenset(self.op.tags)
3790     cur_tags = self.target.GetTags()
3791     if not del_tags <= cur_tags:
3792       diff_tags = del_tags - cur_tags
3793       diff_names = ["'%s'" % tag for tag in diff_tags]
3794       diff_names.sort()
3795       raise errors.OpPrereqError("Tag(s) %s not found" %
3796                                  (",".join(diff_names)))
3797
3798   def Exec(self, feedback_fn):
3799     """Remove the tag from the object.
3800
3801     """
3802     for tag in self.op.tags:
3803       self.target.RemoveTag(tag)
3804     try:
3805       self.cfg.Update(self.target)
3806     except errors.ConfigurationError:
3807       raise errors.OpRetryError("There has been a modification to the"
3808                                 " config file and the operation has been"
3809                                 " aborted. Please retry.")