Add missing descriptions to {Add,Remove}EtcHostsEntry.
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _RemoveHostFromEtcHosts(hostname):
167   """Wrapper around utils.RemoteEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
172   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
173
174
175 def _GetWantedNodes(lu, nodes):
176   """Returns list of checked and expanded node names.
177
178   Args:
179     nodes: List of nodes (strings) or None for all
180
181   """
182   if not isinstance(nodes, list):
183     raise errors.OpPrereqError("Invalid argument type 'nodes'")
184
185   if nodes:
186     wanted = []
187
188     for name in nodes:
189       node = lu.cfg.ExpandNodeName(name)
190       if node is None:
191         raise errors.OpPrereqError("No such node name '%s'" % name)
192       wanted.append(node)
193
194   else:
195     wanted = lu.cfg.GetNodeList()
196   return utils.NiceSort(wanted)
197
198
199 def _GetWantedInstances(lu, instances):
200   """Returns list of checked and expanded instance names.
201
202   Args:
203     instances: List of instances (strings) or None for all
204
205   """
206   if not isinstance(instances, list):
207     raise errors.OpPrereqError("Invalid argument type 'instances'")
208
209   if instances:
210     wanted = []
211
212     for name in instances:
213       instance = lu.cfg.ExpandInstanceName(name)
214       if instance is None:
215         raise errors.OpPrereqError("No such instance name '%s'" % name)
216       wanted.append(instance)
217
218   else:
219     wanted = lu.cfg.GetInstanceList()
220   return utils.NiceSort(wanted)
221
222
223 def _CheckOutputFields(static, dynamic, selected):
224   """Checks whether all selected fields are valid.
225
226   Args:
227     static: Static fields
228     dynamic: Dynamic fields
229
230   """
231   static_fields = frozenset(static)
232   dynamic_fields = frozenset(dynamic)
233
234   all_fields = static_fields | dynamic_fields
235
236   if not all_fields.issuperset(selected):
237     raise errors.OpPrereqError("Unknown output fields selected: %s"
238                                % ",".join(frozenset(selected).
239                                           difference(all_fields)))
240
241
242 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
243                           memory, vcpus, nics):
244   """Builds instance related env variables for hooks from single variables.
245
246   Args:
247     secondary_nodes: List of secondary nodes as strings
248   """
249   env = {
250     "OP_TARGET": name,
251     "INSTANCE_NAME": name,
252     "INSTANCE_PRIMARY": primary_node,
253     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
254     "INSTANCE_OS_TYPE": os_type,
255     "INSTANCE_STATUS": status,
256     "INSTANCE_MEMORY": memory,
257     "INSTANCE_VCPUS": vcpus,
258   }
259
260   if nics:
261     nic_count = len(nics)
262     for idx, (ip, bridge) in enumerate(nics):
263       if ip is None:
264         ip = ""
265       env["INSTANCE_NIC%d_IP" % idx] = ip
266       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
267   else:
268     nic_count = 0
269
270   env["INSTANCE_NIC_COUNT"] = nic_count
271
272   return env
273
274
275 def _BuildInstanceHookEnvByObject(instance, override=None):
276   """Builds instance related env variables for hooks from an object.
277
278   Args:
279     instance: objects.Instance object of instance
280     override: dict of values to override
281   """
282   args = {
283     'name': instance.name,
284     'primary_node': instance.primary_node,
285     'secondary_nodes': instance.secondary_nodes,
286     'os_type': instance.os,
287     'status': instance.os,
288     'memory': instance.memory,
289     'vcpus': instance.vcpus,
290     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
291   }
292   if override:
293     args.update(override)
294   return _BuildInstanceHookEnv(**args)
295
296
297 def _UpdateKnownHosts(fullnode, ip, pubkey):
298   """Ensure a node has a correct known_hosts entry.
299
300   Args:
301     fullnode - Fully qualified domain name of host. (str)
302     ip       - IPv4 address of host (str)
303     pubkey   - the public key of the cluster
304
305   """
306   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
307     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
308   else:
309     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
310
311   inthere = False
312
313   save_lines = []
314   add_lines = []
315   removed = False
316
317   for rawline in f:
318     logger.Debug('read %s' % (repr(rawline),))
319
320     parts = rawline.rstrip('\r\n').split()
321
322     # Ignore unwanted lines
323     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
324       fields = parts[0].split(',')
325       key = parts[2]
326
327       haveall = True
328       havesome = False
329       for spec in [ ip, fullnode ]:
330         if spec not in fields:
331           haveall = False
332         if spec in fields:
333           havesome = True
334
335       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
336       if haveall and key == pubkey:
337         inthere = True
338         save_lines.append(rawline)
339         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
340         continue
341
342       if havesome and (not haveall or key != pubkey):
343         removed = True
344         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
345         continue
346
347     save_lines.append(rawline)
348
349   if not inthere:
350     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
351     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
352
353   if removed:
354     save_lines = save_lines + add_lines
355
356     # Write a new file and replace old.
357     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
358                                    constants.DATA_DIR)
359     newfile = os.fdopen(fd, 'w')
360     try:
361       newfile.write(''.join(save_lines))
362     finally:
363       newfile.close()
364     logger.Debug("Wrote new known_hosts.")
365     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
366
367   elif add_lines:
368     # Simply appending a new line will do the trick.
369     f.seek(0, 2)
370     for add in add_lines:
371       f.write(add)
372
373   f.close()
374
375
376 def _HasValidVG(vglist, vgname):
377   """Checks if the volume group list is valid.
378
379   A non-None return value means there's an error, and the return value
380   is the error message.
381
382   """
383   vgsize = vglist.get(vgname, None)
384   if vgsize is None:
385     return "volume group '%s' missing" % vgname
386   elif vgsize < 20480:
387     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
388             (vgname, vgsize))
389   return None
390
391
392 def _InitSSHSetup(node):
393   """Setup the SSH configuration for the cluster.
394
395
396   This generates a dsa keypair for root, adds the pub key to the
397   permitted hosts and adds the hostkey to its own known hosts.
398
399   Args:
400     node: the name of this host as a fqdn
401
402   """
403   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404
405   for name in priv_key, pub_key:
406     if os.path.exists(name):
407       utils.CreateBackup(name)
408     utils.RemoveFile(name)
409
410   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
411                          "-f", priv_key,
412                          "-q", "-N", ""])
413   if result.failed:
414     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
415                              result.output)
416
417   f = open(pub_key, 'r')
418   try:
419     utils.AddAuthorizedKey(auth_keys, f.read(8192))
420   finally:
421     f.close()
422
423
424 def _InitGanetiServerSetup(ss):
425   """Setup the necessary configuration for the initial node daemon.
426
427   This creates the nodepass file containing the shared password for
428   the cluster and also generates the SSL certificate.
429
430   """
431   # Create pseudo random password
432   randpass = sha.new(os.urandom(64)).hexdigest()
433   # and write it into sstore
434   ss.SetKey(ss.SS_NODED_PASS, randpass)
435
436   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
437                          "-days", str(365*5), "-nodes", "-x509",
438                          "-keyout", constants.SSL_CERT_FILE,
439                          "-out", constants.SSL_CERT_FILE, "-batch"])
440   if result.failed:
441     raise errors.OpExecError("could not generate server ssl cert, command"
442                              " %s had exitcode %s and error message %s" %
443                              (result.cmd, result.exit_code, result.output))
444
445   os.chmod(constants.SSL_CERT_FILE, 0400)
446
447   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
448
449   if result.failed:
450     raise errors.OpExecError("Could not start the node daemon, command %s"
451                              " had exitcode %s and error %s" %
452                              (result.cmd, result.exit_code, result.output))
453
454
455 def _CheckInstanceBridgesExist(instance):
456   """Check that the brigdes needed by an instance exist.
457
458   """
459   # check bridges existance
460   brlist = [nic.bridge for nic in instance.nics]
461   if not rpc.call_bridges_exist(instance.primary_node, brlist):
462     raise errors.OpPrereqError("one or more target bridges %s does not"
463                                " exist on destination node '%s'" %
464                                (brlist, instance.primary_node))
465
466
467 class LUInitCluster(LogicalUnit):
468   """Initialise the cluster.
469
470   """
471   HPATH = "cluster-init"
472   HTYPE = constants.HTYPE_CLUSTER
473   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
474               "def_bridge", "master_netdev"]
475   REQ_CLUSTER = False
476
477   def BuildHooksEnv(self):
478     """Build hooks env.
479
480     Notes: Since we don't require a cluster, we must manually add
481     ourselves in the post-run node list.
482
483     """
484     env = {"OP_TARGET": self.op.cluster_name}
485     return env, [], [self.hostname.name]
486
487   def CheckPrereq(self):
488     """Verify that the passed name is a valid one.
489
490     """
491     if config.ConfigWriter.IsCluster():
492       raise errors.OpPrereqError("Cluster is already initialised")
493
494     self.hostname = hostname = utils.HostInfo()
495
496     if hostname.ip.startswith("127."):
497       raise errors.OpPrereqError("This host's IP resolves to the private"
498                                  " range (%s). Please fix DNS or /etc/hosts." %
499                                  (hostname.ip,))
500
501     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
502
503     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
504                          constants.DEFAULT_NODED_PORT):
505       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
506                                  " to %s,\nbut this ip address does not"
507                                  " belong to this host."
508                                  " Aborting." % hostname.ip)
509
510     secondary_ip = getattr(self.op, "secondary_ip", None)
511     if secondary_ip and not utils.IsValidIP(secondary_ip):
512       raise errors.OpPrereqError("Invalid secondary ip given")
513     if (secondary_ip and
514         secondary_ip != hostname.ip and
515         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
516                            constants.DEFAULT_NODED_PORT))):
517       raise errors.OpPrereqError("You gave %s as secondary IP,\n"
518                                  "but it does not belong to this host." %
519                                  secondary_ip)
520     self.secondary_ip = secondary_ip
521
522     # checks presence of the volume group given
523     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
524
525     if vgstatus:
526       raise errors.OpPrereqError("Error: %s" % vgstatus)
527
528     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
529                     self.op.mac_prefix):
530       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
531                                  self.op.mac_prefix)
532
533     if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
534       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
535                                  self.op.hypervisor_type)
536
537     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
538     if result.failed:
539       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
540                                  (self.op.master_netdev,
541                                   result.output.strip()))
542
543     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
544             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
545       raise errors.OpPrereqError("Init.d script '%s' missing or not "
546                                  "executable." % constants.NODE_INITD_SCRIPT)
547
548   def Exec(self, feedback_fn):
549     """Initialize the cluster.
550
551     """
552     clustername = self.clustername
553     hostname = self.hostname
554
555     # set up the simple store
556     self.sstore = ss = ssconf.SimpleStore()
557     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
558     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
559     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
560     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
561     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
562
563     # set up the inter-node password and certificate
564     _InitGanetiServerSetup(ss)
565
566     # start the master ip
567     rpc.call_node_start_master(hostname.name)
568
569     # set up ssh config and /etc/hosts
570     f = open(constants.SSH_HOST_RSA_PUB, 'r')
571     try:
572       sshline = f.read()
573     finally:
574       f.close()
575     sshkey = sshline.split(" ")[1]
576
577     hi = utils.HostInfo(name=hostname.name)
578     utils.AddEtcHostsEntry(constants.ETC_HOSTS, hostname.name, hi.ip)
579     utils.AddEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName(), hi.ip)
580     del hi
581
582     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
583
584     _InitSSHSetup(hostname.name)
585
586     # init of cluster config file
587     self.cfg = cfgw = config.ConfigWriter()
588     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
589                     sshkey, self.op.mac_prefix,
590                     self.op.vg_name, self.op.def_bridge)
591
592
593 class LUDestroyCluster(NoHooksLU):
594   """Logical unit for destroying the cluster.
595
596   """
597   _OP_REQP = []
598
599   def CheckPrereq(self):
600     """Check prerequisites.
601
602     This checks whether the cluster is empty.
603
604     Any errors are signalled by raising errors.OpPrereqError.
605
606     """
607     master = self.sstore.GetMasterNode()
608
609     nodelist = self.cfg.GetNodeList()
610     if len(nodelist) != 1 or nodelist[0] != master:
611       raise errors.OpPrereqError("There are still %d node(s) in"
612                                  " this cluster." % (len(nodelist) - 1))
613     instancelist = self.cfg.GetInstanceList()
614     if instancelist:
615       raise errors.OpPrereqError("There are still %d instance(s) in"
616                                  " this cluster." % len(instancelist))
617
618   def Exec(self, feedback_fn):
619     """Destroys the cluster.
620
621     """
622     master = self.sstore.GetMasterNode()
623     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
624     utils.CreateBackup(priv_key)
625     utils.CreateBackup(pub_key)
626     rpc.call_node_leave_cluster(master)
627     _RemoveHostFromEtcHosts(master)
628
629
630 class LUVerifyCluster(NoHooksLU):
631   """Verifies the cluster status.
632
633   """
634   _OP_REQP = []
635
636   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
637                   remote_version, feedback_fn):
638     """Run multiple tests against a node.
639
640     Test list:
641       - compares ganeti version
642       - checks vg existance and size > 20G
643       - checks config file checksum
644       - checks ssh to other nodes
645
646     Args:
647       node: name of the node to check
648       file_list: required list of files
649       local_cksum: dictionary of local files and their checksums
650
651     """
652     # compares ganeti version
653     local_version = constants.PROTOCOL_VERSION
654     if not remote_version:
655       feedback_fn(" - ERROR: connection to %s failed" % (node))
656       return True
657
658     if local_version != remote_version:
659       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
660                       (local_version, node, remote_version))
661       return True
662
663     # checks vg existance and size > 20G
664
665     bad = False
666     if not vglist:
667       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
668                       (node,))
669       bad = True
670     else:
671       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
672       if vgstatus:
673         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
674         bad = True
675
676     # checks config file checksum
677     # checks ssh to any
678
679     if 'filelist' not in node_result:
680       bad = True
681       feedback_fn("  - ERROR: node hasn't returned file checksum data")
682     else:
683       remote_cksum = node_result['filelist']
684       for file_name in file_list:
685         if file_name not in remote_cksum:
686           bad = True
687           feedback_fn("  - ERROR: file '%s' missing" % file_name)
688         elif remote_cksum[file_name] != local_cksum[file_name]:
689           bad = True
690           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
691
692     if 'nodelist' not in node_result:
693       bad = True
694       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
695     else:
696       if node_result['nodelist']:
697         bad = True
698         for node in node_result['nodelist']:
699           feedback_fn("  - ERROR: communication with node '%s': %s" %
700                           (node, node_result['nodelist'][node]))
701     hyp_result = node_result.get('hypervisor', None)
702     if hyp_result is not None:
703       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
704     return bad
705
706   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
707     """Verify an instance.
708
709     This function checks to see if the required block devices are
710     available on the instance's node.
711
712     """
713     bad = False
714
715     instancelist = self.cfg.GetInstanceList()
716     if not instance in instancelist:
717       feedback_fn("  - ERROR: instance %s not in instance list %s" %
718                       (instance, instancelist))
719       bad = True
720
721     instanceconfig = self.cfg.GetInstanceInfo(instance)
722     node_current = instanceconfig.primary_node
723
724     node_vol_should = {}
725     instanceconfig.MapLVsByNode(node_vol_should)
726
727     for node in node_vol_should:
728       for volume in node_vol_should[node]:
729         if node not in node_vol_is or volume not in node_vol_is[node]:
730           feedback_fn("  - ERROR: volume %s missing on node %s" %
731                           (volume, node))
732           bad = True
733
734     if not instanceconfig.status == 'down':
735       if not instance in node_instance[node_current]:
736         feedback_fn("  - ERROR: instance %s not running on node %s" %
737                         (instance, node_current))
738         bad = True
739
740     for node in node_instance:
741       if (not node == node_current):
742         if instance in node_instance[node]:
743           feedback_fn("  - ERROR: instance %s should not run on node %s" %
744                           (instance, node))
745           bad = True
746
747     return bad
748
749   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
750     """Verify if there are any unknown volumes in the cluster.
751
752     The .os, .swap and backup volumes are ignored. All other volumes are
753     reported as unknown.
754
755     """
756     bad = False
757
758     for node in node_vol_is:
759       for volume in node_vol_is[node]:
760         if node not in node_vol_should or volume not in node_vol_should[node]:
761           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
762                       (volume, node))
763           bad = True
764     return bad
765
766   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
767     """Verify the list of running instances.
768
769     This checks what instances are running but unknown to the cluster.
770
771     """
772     bad = False
773     for node in node_instance:
774       for runninginstance in node_instance[node]:
775         if runninginstance not in instancelist:
776           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
777                           (runninginstance, node))
778           bad = True
779     return bad
780
781   def CheckPrereq(self):
782     """Check prerequisites.
783
784     This has no prerequisites.
785
786     """
787     pass
788
789   def Exec(self, feedback_fn):
790     """Verify integrity of cluster, performing various test on nodes.
791
792     """
793     bad = False
794     feedback_fn("* Verifying global settings")
795     self.cfg.VerifyConfig()
796
797     vg_name = self.cfg.GetVGName()
798     nodelist = utils.NiceSort(self.cfg.GetNodeList())
799     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
800     node_volume = {}
801     node_instance = {}
802
803     # FIXME: verify OS list
804     # do local checksums
805     file_names = list(self.sstore.GetFileList())
806     file_names.append(constants.SSL_CERT_FILE)
807     file_names.append(constants.CLUSTER_CONF_FILE)
808     local_checksums = utils.FingerprintFiles(file_names)
809
810     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
811     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
812     all_instanceinfo = rpc.call_instance_list(nodelist)
813     all_vglist = rpc.call_vg_list(nodelist)
814     node_verify_param = {
815       'filelist': file_names,
816       'nodelist': nodelist,
817       'hypervisor': None,
818       }
819     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
820     all_rversion = rpc.call_version(nodelist)
821
822     for node in nodelist:
823       feedback_fn("* Verifying node %s" % node)
824       result = self._VerifyNode(node, file_names, local_checksums,
825                                 all_vglist[node], all_nvinfo[node],
826                                 all_rversion[node], feedback_fn)
827       bad = bad or result
828
829       # node_volume
830       volumeinfo = all_volumeinfo[node]
831
832       if type(volumeinfo) != dict:
833         feedback_fn("  - ERROR: connection to %s failed" % (node,))
834         bad = True
835         continue
836
837       node_volume[node] = volumeinfo
838
839       # node_instance
840       nodeinstance = all_instanceinfo[node]
841       if type(nodeinstance) != list:
842         feedback_fn("  - ERROR: connection to %s failed" % (node,))
843         bad = True
844         continue
845
846       node_instance[node] = nodeinstance
847
848     node_vol_should = {}
849
850     for instance in instancelist:
851       feedback_fn("* Verifying instance %s" % instance)
852       result =  self._VerifyInstance(instance, node_volume, node_instance,
853                                      feedback_fn)
854       bad = bad or result
855
856       inst_config = self.cfg.GetInstanceInfo(instance)
857
858       inst_config.MapLVsByNode(node_vol_should)
859
860     feedback_fn("* Verifying orphan volumes")
861     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
862                                        feedback_fn)
863     bad = bad or result
864
865     feedback_fn("* Verifying remaining instances")
866     result = self._VerifyOrphanInstances(instancelist, node_instance,
867                                          feedback_fn)
868     bad = bad or result
869
870     return int(bad)
871
872
873 class LURenameCluster(LogicalUnit):
874   """Rename the cluster.
875
876   """
877   HPATH = "cluster-rename"
878   HTYPE = constants.HTYPE_CLUSTER
879   _OP_REQP = ["name"]
880
881   def BuildHooksEnv(self):
882     """Build hooks env.
883
884     """
885     env = {
886       "OP_TARGET": self.op.sstore.GetClusterName(),
887       "NEW_NAME": self.op.name,
888       }
889     mn = self.sstore.GetMasterNode()
890     return env, [mn], [mn]
891
892   def CheckPrereq(self):
893     """Verify that the passed name is a valid one.
894
895     """
896     hostname = utils.HostInfo(self.op.name)
897
898     new_name = hostname.name
899     self.ip = new_ip = hostname.ip
900     old_name = self.sstore.GetClusterName()
901     old_ip = self.sstore.GetMasterIP()
902     if new_name == old_name and new_ip == old_ip:
903       raise errors.OpPrereqError("Neither the name nor the IP address of the"
904                                  " cluster has changed")
905     if new_ip != old_ip:
906       result = utils.RunCmd(["fping", "-q", new_ip])
907       if not result.failed:
908         raise errors.OpPrereqError("The given cluster IP address (%s) is"
909                                    " reachable on the network. Aborting." %
910                                    new_ip)
911
912     self.op.name = new_name
913
914   def Exec(self, feedback_fn):
915     """Rename the cluster.
916
917     """
918     clustername = self.op.name
919     ip = self.ip
920     ss = self.sstore
921
922     # shutdown the master IP
923     master = ss.GetMasterNode()
924     if not rpc.call_node_stop_master(master):
925       raise errors.OpExecError("Could not disable the master role")
926
927     try:
928       # modify the sstore
929       ss.SetKey(ss.SS_MASTER_IP, ip)
930       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
931
932       # Distribute updated ss config to all nodes
933       myself = self.cfg.GetNodeInfo(master)
934       dist_nodes = self.cfg.GetNodeList()
935       if myself.name in dist_nodes:
936         dist_nodes.remove(myself.name)
937
938       logger.Debug("Copying updated ssconf data to all nodes")
939       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
940         fname = ss.KeyToFilename(keyname)
941         result = rpc.call_upload_file(dist_nodes, fname)
942         for to_node in dist_nodes:
943           if not result[to_node]:
944             logger.Error("copy of file %s to node %s failed" %
945                          (fname, to_node))
946     finally:
947       if not rpc.call_node_start_master(master):
948         logger.Error("Could not re-enable the master role on the master,\n"
949                      "please restart manually.")
950
951
952 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
953   """Sleep and poll for an instance's disk to sync.
954
955   """
956   if not instance.disks:
957     return True
958
959   if not oneshot:
960     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
961
962   node = instance.primary_node
963
964   for dev in instance.disks:
965     cfgw.SetDiskID(dev, node)
966
967   retries = 0
968   while True:
969     max_time = 0
970     done = True
971     cumul_degraded = False
972     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
973     if not rstats:
974       proc.LogWarning("Can't get any data from node %s" % node)
975       retries += 1
976       if retries >= 10:
977         raise errors.RemoteError("Can't contact node %s for mirror data,"
978                                  " aborting." % node)
979       time.sleep(6)
980       continue
981     retries = 0
982     for i in range(len(rstats)):
983       mstat = rstats[i]
984       if mstat is None:
985         proc.LogWarning("Can't compute data for node %s/%s" %
986                         (node, instance.disks[i].iv_name))
987         continue
988       # we ignore the ldisk parameter
989       perc_done, est_time, is_degraded, _ = mstat
990       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
991       if perc_done is not None:
992         done = False
993         if est_time is not None:
994           rem_time = "%d estimated seconds remaining" % est_time
995           max_time = est_time
996         else:
997           rem_time = "no time estimate"
998         proc.LogInfo("- device %s: %5.2f%% done, %s" %
999                      (instance.disks[i].iv_name, perc_done, rem_time))
1000     if done or oneshot:
1001       break
1002
1003     if unlock:
1004       utils.Unlock('cmd')
1005     try:
1006       time.sleep(min(60, max_time))
1007     finally:
1008       if unlock:
1009         utils.Lock('cmd')
1010
1011   if done:
1012     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1013   return not cumul_degraded
1014
1015
1016 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1017   """Check that mirrors are not degraded.
1018
1019   The ldisk parameter, if True, will change the test from the
1020   is_degraded attribute (which represents overall non-ok status for
1021   the device(s)) to the ldisk (representing the local storage status).
1022
1023   """
1024   cfgw.SetDiskID(dev, node)
1025   if ldisk:
1026     idx = 6
1027   else:
1028     idx = 5
1029
1030   result = True
1031   if on_primary or dev.AssembleOnSecondary():
1032     rstats = rpc.call_blockdev_find(node, dev)
1033     if not rstats:
1034       logger.ToStderr("Can't get any data from node %s" % node)
1035       result = False
1036     else:
1037       result = result and (not rstats[idx])
1038   if dev.children:
1039     for child in dev.children:
1040       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1041
1042   return result
1043
1044
1045 class LUDiagnoseOS(NoHooksLU):
1046   """Logical unit for OS diagnose/query.
1047
1048   """
1049   _OP_REQP = []
1050
1051   def CheckPrereq(self):
1052     """Check prerequisites.
1053
1054     This always succeeds, since this is a pure query LU.
1055
1056     """
1057     return
1058
1059   def Exec(self, feedback_fn):
1060     """Compute the list of OSes.
1061
1062     """
1063     node_list = self.cfg.GetNodeList()
1064     node_data = rpc.call_os_diagnose(node_list)
1065     if node_data == False:
1066       raise errors.OpExecError("Can't gather the list of OSes")
1067     return node_data
1068
1069
1070 class LURemoveNode(LogicalUnit):
1071   """Logical unit for removing a node.
1072
1073   """
1074   HPATH = "node-remove"
1075   HTYPE = constants.HTYPE_NODE
1076   _OP_REQP = ["node_name"]
1077
1078   def BuildHooksEnv(self):
1079     """Build hooks env.
1080
1081     This doesn't run on the target node in the pre phase as a failed
1082     node would not allows itself to run.
1083
1084     """
1085     env = {
1086       "OP_TARGET": self.op.node_name,
1087       "NODE_NAME": self.op.node_name,
1088       }
1089     all_nodes = self.cfg.GetNodeList()
1090     all_nodes.remove(self.op.node_name)
1091     return env, all_nodes, all_nodes
1092
1093   def CheckPrereq(self):
1094     """Check prerequisites.
1095
1096     This checks:
1097      - the node exists in the configuration
1098      - it does not have primary or secondary instances
1099      - it's not the master
1100
1101     Any errors are signalled by raising errors.OpPrereqError.
1102
1103     """
1104     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1105     if node is None:
1106       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1107
1108     instance_list = self.cfg.GetInstanceList()
1109
1110     masternode = self.sstore.GetMasterNode()
1111     if node.name == masternode:
1112       raise errors.OpPrereqError("Node is the master node,"
1113                                  " you need to failover first.")
1114
1115     for instance_name in instance_list:
1116       instance = self.cfg.GetInstanceInfo(instance_name)
1117       if node.name == instance.primary_node:
1118         raise errors.OpPrereqError("Instance %s still running on the node,"
1119                                    " please remove first." % instance_name)
1120       if node.name in instance.secondary_nodes:
1121         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1122                                    " please remove first." % instance_name)
1123     self.op.node_name = node.name
1124     self.node = node
1125
1126   def Exec(self, feedback_fn):
1127     """Removes the node from the cluster.
1128
1129     """
1130     node = self.node
1131     logger.Info("stopping the node daemon and removing configs from node %s" %
1132                 node.name)
1133
1134     rpc.call_node_leave_cluster(node.name)
1135
1136     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1137
1138     logger.Info("Removing node %s from config" % node.name)
1139
1140     self.cfg.RemoveNode(node.name)
1141
1142     _RemoveHostFromEtcHosts(node.name)
1143
1144
1145 class LUQueryNodes(NoHooksLU):
1146   """Logical unit for querying nodes.
1147
1148   """
1149   _OP_REQP = ["output_fields", "names"]
1150
1151   def CheckPrereq(self):
1152     """Check prerequisites.
1153
1154     This checks that the fields required are valid output fields.
1155
1156     """
1157     self.dynamic_fields = frozenset(["dtotal", "dfree",
1158                                      "mtotal", "mnode", "mfree",
1159                                      "bootid"])
1160
1161     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1162                                "pinst_list", "sinst_list",
1163                                "pip", "sip"],
1164                        dynamic=self.dynamic_fields,
1165                        selected=self.op.output_fields)
1166
1167     self.wanted = _GetWantedNodes(self, self.op.names)
1168
1169   def Exec(self, feedback_fn):
1170     """Computes the list of nodes and their attributes.
1171
1172     """
1173     nodenames = self.wanted
1174     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1175
1176     # begin data gathering
1177
1178     if self.dynamic_fields.intersection(self.op.output_fields):
1179       live_data = {}
1180       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1181       for name in nodenames:
1182         nodeinfo = node_data.get(name, None)
1183         if nodeinfo:
1184           live_data[name] = {
1185             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1186             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1187             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1188             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1189             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1190             "bootid": nodeinfo['bootid'],
1191             }
1192         else:
1193           live_data[name] = {}
1194     else:
1195       live_data = dict.fromkeys(nodenames, {})
1196
1197     node_to_primary = dict([(name, set()) for name in nodenames])
1198     node_to_secondary = dict([(name, set()) for name in nodenames])
1199
1200     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1201                              "sinst_cnt", "sinst_list"))
1202     if inst_fields & frozenset(self.op.output_fields):
1203       instancelist = self.cfg.GetInstanceList()
1204
1205       for instance_name in instancelist:
1206         inst = self.cfg.GetInstanceInfo(instance_name)
1207         if inst.primary_node in node_to_primary:
1208           node_to_primary[inst.primary_node].add(inst.name)
1209         for secnode in inst.secondary_nodes:
1210           if secnode in node_to_secondary:
1211             node_to_secondary[secnode].add(inst.name)
1212
1213     # end data gathering
1214
1215     output = []
1216     for node in nodelist:
1217       node_output = []
1218       for field in self.op.output_fields:
1219         if field == "name":
1220           val = node.name
1221         elif field == "pinst_list":
1222           val = list(node_to_primary[node.name])
1223         elif field == "sinst_list":
1224           val = list(node_to_secondary[node.name])
1225         elif field == "pinst_cnt":
1226           val = len(node_to_primary[node.name])
1227         elif field == "sinst_cnt":
1228           val = len(node_to_secondary[node.name])
1229         elif field == "pip":
1230           val = node.primary_ip
1231         elif field == "sip":
1232           val = node.secondary_ip
1233         elif field in self.dynamic_fields:
1234           val = live_data[node.name].get(field, None)
1235         else:
1236           raise errors.ParameterError(field)
1237         node_output.append(val)
1238       output.append(node_output)
1239
1240     return output
1241
1242
1243 class LUQueryNodeVolumes(NoHooksLU):
1244   """Logical unit for getting volumes on node(s).
1245
1246   """
1247   _OP_REQP = ["nodes", "output_fields"]
1248
1249   def CheckPrereq(self):
1250     """Check prerequisites.
1251
1252     This checks that the fields required are valid output fields.
1253
1254     """
1255     self.nodes = _GetWantedNodes(self, self.op.nodes)
1256
1257     _CheckOutputFields(static=["node"],
1258                        dynamic=["phys", "vg", "name", "size", "instance"],
1259                        selected=self.op.output_fields)
1260
1261
1262   def Exec(self, feedback_fn):
1263     """Computes the list of nodes and their attributes.
1264
1265     """
1266     nodenames = self.nodes
1267     volumes = rpc.call_node_volumes(nodenames)
1268
1269     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1270              in self.cfg.GetInstanceList()]
1271
1272     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1273
1274     output = []
1275     for node in nodenames:
1276       if node not in volumes or not volumes[node]:
1277         continue
1278
1279       node_vols = volumes[node][:]
1280       node_vols.sort(key=lambda vol: vol['dev'])
1281
1282       for vol in node_vols:
1283         node_output = []
1284         for field in self.op.output_fields:
1285           if field == "node":
1286             val = node
1287           elif field == "phys":
1288             val = vol['dev']
1289           elif field == "vg":
1290             val = vol['vg']
1291           elif field == "name":
1292             val = vol['name']
1293           elif field == "size":
1294             val = int(float(vol['size']))
1295           elif field == "instance":
1296             for inst in ilist:
1297               if node not in lv_by_node[inst]:
1298                 continue
1299               if vol['name'] in lv_by_node[inst][node]:
1300                 val = inst.name
1301                 break
1302             else:
1303               val = '-'
1304           else:
1305             raise errors.ParameterError(field)
1306           node_output.append(str(val))
1307
1308         output.append(node_output)
1309
1310     return output
1311
1312
1313 class LUAddNode(LogicalUnit):
1314   """Logical unit for adding node to the cluster.
1315
1316   """
1317   HPATH = "node-add"
1318   HTYPE = constants.HTYPE_NODE
1319   _OP_REQP = ["node_name"]
1320
1321   def BuildHooksEnv(self):
1322     """Build hooks env.
1323
1324     This will run on all nodes before, and on all nodes + the new node after.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.op.node_name,
1329       "NODE_NAME": self.op.node_name,
1330       "NODE_PIP": self.op.primary_ip,
1331       "NODE_SIP": self.op.secondary_ip,
1332       }
1333     nodes_0 = self.cfg.GetNodeList()
1334     nodes_1 = nodes_0 + [self.op.node_name, ]
1335     return env, nodes_0, nodes_1
1336
1337   def CheckPrereq(self):
1338     """Check prerequisites.
1339
1340     This checks:
1341      - the new node is not already in the config
1342      - it is resolvable
1343      - its parameters (single/dual homed) matches the cluster
1344
1345     Any errors are signalled by raising errors.OpPrereqError.
1346
1347     """
1348     node_name = self.op.node_name
1349     cfg = self.cfg
1350
1351     dns_data = utils.HostInfo(node_name)
1352
1353     node = dns_data.name
1354     primary_ip = self.op.primary_ip = dns_data.ip
1355     secondary_ip = getattr(self.op, "secondary_ip", None)
1356     if secondary_ip is None:
1357       secondary_ip = primary_ip
1358     if not utils.IsValidIP(secondary_ip):
1359       raise errors.OpPrereqError("Invalid secondary IP given")
1360     self.op.secondary_ip = secondary_ip
1361     node_list = cfg.GetNodeList()
1362     if node in node_list:
1363       raise errors.OpPrereqError("Node %s is already in the configuration"
1364                                  % node)
1365
1366     for existing_node_name in node_list:
1367       existing_node = cfg.GetNodeInfo(existing_node_name)
1368       if (existing_node.primary_ip == primary_ip or
1369           existing_node.secondary_ip == primary_ip or
1370           existing_node.primary_ip == secondary_ip or
1371           existing_node.secondary_ip == secondary_ip):
1372         raise errors.OpPrereqError("New node ip address(es) conflict with"
1373                                    " existing node %s" % existing_node.name)
1374
1375     # check that the type of the node (single versus dual homed) is the
1376     # same as for the master
1377     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1378     master_singlehomed = myself.secondary_ip == myself.primary_ip
1379     newbie_singlehomed = secondary_ip == primary_ip
1380     if master_singlehomed != newbie_singlehomed:
1381       if master_singlehomed:
1382         raise errors.OpPrereqError("The master has no private ip but the"
1383                                    " new node has one")
1384       else:
1385         raise errors.OpPrereqError("The master has a private ip but the"
1386                                    " new node doesn't have one")
1387
1388     # checks reachablity
1389     if not utils.TcpPing(utils.HostInfo().name,
1390                          primary_ip,
1391                          constants.DEFAULT_NODED_PORT):
1392       raise errors.OpPrereqError("Node not reachable by ping")
1393
1394     if not newbie_singlehomed:
1395       # check reachability from my secondary ip to newbie's secondary ip
1396       if not utils.TcpPing(myself.secondary_ip,
1397                            secondary_ip,
1398                            constants.DEFAULT_NODED_PORT):
1399         raise errors.OpPrereqError(
1400           "Node secondary ip not reachable by TCP based ping to noded port")
1401
1402     self.new_node = objects.Node(name=node,
1403                                  primary_ip=primary_ip,
1404                                  secondary_ip=secondary_ip)
1405
1406   def Exec(self, feedback_fn):
1407     """Adds the new node to the cluster.
1408
1409     """
1410     new_node = self.new_node
1411     node = new_node.name
1412
1413     # set up inter-node password and certificate and restarts the node daemon
1414     gntpass = self.sstore.GetNodeDaemonPassword()
1415     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1416       raise errors.OpExecError("ganeti password corruption detected")
1417     f = open(constants.SSL_CERT_FILE)
1418     try:
1419       gntpem = f.read(8192)
1420     finally:
1421       f.close()
1422     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1423     # so we use this to detect an invalid certificate; as long as the
1424     # cert doesn't contain this, the here-document will be correctly
1425     # parsed by the shell sequence below
1426     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1427       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1428     if not gntpem.endswith("\n"):
1429       raise errors.OpExecError("PEM must end with newline")
1430     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1431
1432     # and then connect with ssh to set password and start ganeti-noded
1433     # note that all the below variables are sanitized at this point,
1434     # either by being constants or by the checks above
1435     ss = self.sstore
1436     mycommand = ("umask 077 && "
1437                  "echo '%s' > '%s' && "
1438                  "cat > '%s' << '!EOF.' && \n"
1439                  "%s!EOF.\n%s restart" %
1440                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1441                   constants.SSL_CERT_FILE, gntpem,
1442                   constants.NODE_INITD_SCRIPT))
1443
1444     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1445     if result.failed:
1446       raise errors.OpExecError("Remote command on node %s, error: %s,"
1447                                " output: %s" %
1448                                (node, result.fail_reason, result.output))
1449
1450     # check connectivity
1451     time.sleep(4)
1452
1453     result = rpc.call_version([node])[node]
1454     if result:
1455       if constants.PROTOCOL_VERSION == result:
1456         logger.Info("communication to node %s fine, sw version %s match" %
1457                     (node, result))
1458       else:
1459         raise errors.OpExecError("Version mismatch master version %s,"
1460                                  " node version %s" %
1461                                  (constants.PROTOCOL_VERSION, result))
1462     else:
1463       raise errors.OpExecError("Cannot get version from the new node")
1464
1465     # setup ssh on node
1466     logger.Info("copy ssh key to node %s" % node)
1467     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1468     keyarray = []
1469     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1470                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1471                 priv_key, pub_key]
1472
1473     for i in keyfiles:
1474       f = open(i, 'r')
1475       try:
1476         keyarray.append(f.read())
1477       finally:
1478         f.close()
1479
1480     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1481                                keyarray[3], keyarray[4], keyarray[5])
1482
1483     if not result:
1484       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1485
1486     # Add node to our /etc/hosts, and add key to known_hosts
1487     hi = utils.HostInfo(name=new_node.name)
1488     utils.AddEtcHostsEntry(constants.ETC_HOSTS, new_node.name, hi.ip)
1489     utils.AddEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName(), hi.ip)
1490     del hi
1491
1492     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1493                       self.cfg.GetHostKey())
1494
1495     if new_node.secondary_ip != new_node.primary_ip:
1496       if not rpc.call_node_tcp_ping(new_node.name,
1497                                     constants.LOCALHOST_IP_ADDRESS,
1498                                     new_node.secondary_ip,
1499                                     constants.DEFAULT_NODED_PORT,
1500                                     10, False):
1501         raise errors.OpExecError("Node claims it doesn't have the"
1502                                  " secondary ip you gave (%s).\n"
1503                                  "Please fix and re-run this command." %
1504                                  new_node.secondary_ip)
1505
1506     success, msg = ssh.VerifyNodeHostname(node)
1507     if not success:
1508       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1509                                " than the one the resolver gives: %s.\n"
1510                                "Please fix and re-run this command." %
1511                                (node, msg))
1512
1513     # Distribute updated /etc/hosts and known_hosts to all nodes,
1514     # including the node just added
1515     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1516     dist_nodes = self.cfg.GetNodeList() + [node]
1517     if myself.name in dist_nodes:
1518       dist_nodes.remove(myself.name)
1519
1520     logger.Debug("Copying hosts and known_hosts to all nodes")
1521     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1522       result = rpc.call_upload_file(dist_nodes, fname)
1523       for to_node in dist_nodes:
1524         if not result[to_node]:
1525           logger.Error("copy of file %s to node %s failed" %
1526                        (fname, to_node))
1527
1528     to_copy = ss.GetFileList()
1529     for fname in to_copy:
1530       if not ssh.CopyFileToNode(node, fname):
1531         logger.Error("could not copy file %s to node %s" % (fname, node))
1532
1533     logger.Info("adding node %s to cluster.conf" % node)
1534     self.cfg.AddNode(new_node)
1535
1536
1537 class LUMasterFailover(LogicalUnit):
1538   """Failover the master node to the current node.
1539
1540   This is a special LU in that it must run on a non-master node.
1541
1542   """
1543   HPATH = "master-failover"
1544   HTYPE = constants.HTYPE_CLUSTER
1545   REQ_MASTER = False
1546   _OP_REQP = []
1547
1548   def BuildHooksEnv(self):
1549     """Build hooks env.
1550
1551     This will run on the new master only in the pre phase, and on all
1552     the nodes in the post phase.
1553
1554     """
1555     env = {
1556       "OP_TARGET": self.new_master,
1557       "NEW_MASTER": self.new_master,
1558       "OLD_MASTER": self.old_master,
1559       }
1560     return env, [self.new_master], self.cfg.GetNodeList()
1561
1562   def CheckPrereq(self):
1563     """Check prerequisites.
1564
1565     This checks that we are not already the master.
1566
1567     """
1568     self.new_master = utils.HostInfo().name
1569     self.old_master = self.sstore.GetMasterNode()
1570
1571     if self.old_master == self.new_master:
1572       raise errors.OpPrereqError("This commands must be run on the node"
1573                                  " where you want the new master to be.\n"
1574                                  "%s is already the master" %
1575                                  self.old_master)
1576
1577   def Exec(self, feedback_fn):
1578     """Failover the master node.
1579
1580     This command, when run on a non-master node, will cause the current
1581     master to cease being master, and the non-master to become new
1582     master.
1583
1584     """
1585     #TODO: do not rely on gethostname returning the FQDN
1586     logger.Info("setting master to %s, old master: %s" %
1587                 (self.new_master, self.old_master))
1588
1589     if not rpc.call_node_stop_master(self.old_master):
1590       logger.Error("could disable the master role on the old master"
1591                    " %s, please disable manually" % self.old_master)
1592
1593     ss = self.sstore
1594     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1595     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1596                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1597       logger.Error("could not distribute the new simple store master file"
1598                    " to the other nodes, please check.")
1599
1600     if not rpc.call_node_start_master(self.new_master):
1601       logger.Error("could not start the master role on the new master"
1602                    " %s, please check" % self.new_master)
1603       feedback_fn("Error in activating the master IP on the new master,\n"
1604                   "please fix manually.")
1605
1606
1607
1608 class LUQueryClusterInfo(NoHooksLU):
1609   """Query cluster configuration.
1610
1611   """
1612   _OP_REQP = []
1613   REQ_MASTER = False
1614
1615   def CheckPrereq(self):
1616     """No prerequsites needed for this LU.
1617
1618     """
1619     pass
1620
1621   def Exec(self, feedback_fn):
1622     """Return cluster config.
1623
1624     """
1625     result = {
1626       "name": self.sstore.GetClusterName(),
1627       "software_version": constants.RELEASE_VERSION,
1628       "protocol_version": constants.PROTOCOL_VERSION,
1629       "config_version": constants.CONFIG_VERSION,
1630       "os_api_version": constants.OS_API_VERSION,
1631       "export_version": constants.EXPORT_VERSION,
1632       "master": self.sstore.GetMasterNode(),
1633       "architecture": (platform.architecture()[0], platform.machine()),
1634       }
1635
1636     return result
1637
1638
1639 class LUClusterCopyFile(NoHooksLU):
1640   """Copy file to cluster.
1641
1642   """
1643   _OP_REQP = ["nodes", "filename"]
1644
1645   def CheckPrereq(self):
1646     """Check prerequisites.
1647
1648     It should check that the named file exists and that the given list
1649     of nodes is valid.
1650
1651     """
1652     if not os.path.exists(self.op.filename):
1653       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1654
1655     self.nodes = _GetWantedNodes(self, self.op.nodes)
1656
1657   def Exec(self, feedback_fn):
1658     """Copy a file from master to some nodes.
1659
1660     Args:
1661       opts - class with options as members
1662       args - list containing a single element, the file name
1663     Opts used:
1664       nodes - list containing the name of target nodes; if empty, all nodes
1665
1666     """
1667     filename = self.op.filename
1668
1669     myname = utils.HostInfo().name
1670
1671     for node in self.nodes:
1672       if node == myname:
1673         continue
1674       if not ssh.CopyFileToNode(node, filename):
1675         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1676
1677
1678 class LUDumpClusterConfig(NoHooksLU):
1679   """Return a text-representation of the cluster-config.
1680
1681   """
1682   _OP_REQP = []
1683
1684   def CheckPrereq(self):
1685     """No prerequisites.
1686
1687     """
1688     pass
1689
1690   def Exec(self, feedback_fn):
1691     """Dump a representation of the cluster config to the standard output.
1692
1693     """
1694     return self.cfg.DumpConfig()
1695
1696
1697 class LURunClusterCommand(NoHooksLU):
1698   """Run a command on some nodes.
1699
1700   """
1701   _OP_REQP = ["command", "nodes"]
1702
1703   def CheckPrereq(self):
1704     """Check prerequisites.
1705
1706     It checks that the given list of nodes is valid.
1707
1708     """
1709     self.nodes = _GetWantedNodes(self, self.op.nodes)
1710
1711   def Exec(self, feedback_fn):
1712     """Run a command on some nodes.
1713
1714     """
1715     data = []
1716     for node in self.nodes:
1717       result = ssh.SSHCall(node, "root", self.op.command)
1718       data.append((node, result.output, result.exit_code))
1719
1720     return data
1721
1722
1723 class LUActivateInstanceDisks(NoHooksLU):
1724   """Bring up an instance's disks.
1725
1726   """
1727   _OP_REQP = ["instance_name"]
1728
1729   def CheckPrereq(self):
1730     """Check prerequisites.
1731
1732     This checks that the instance is in the cluster.
1733
1734     """
1735     instance = self.cfg.GetInstanceInfo(
1736       self.cfg.ExpandInstanceName(self.op.instance_name))
1737     if instance is None:
1738       raise errors.OpPrereqError("Instance '%s' not known" %
1739                                  self.op.instance_name)
1740     self.instance = instance
1741
1742
1743   def Exec(self, feedback_fn):
1744     """Activate the disks.
1745
1746     """
1747     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1748     if not disks_ok:
1749       raise errors.OpExecError("Cannot activate block devices")
1750
1751     return disks_info
1752
1753
1754 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1755   """Prepare the block devices for an instance.
1756
1757   This sets up the block devices on all nodes.
1758
1759   Args:
1760     instance: a ganeti.objects.Instance object
1761     ignore_secondaries: if true, errors on secondary nodes won't result
1762                         in an error return from the function
1763
1764   Returns:
1765     false if the operation failed
1766     list of (host, instance_visible_name, node_visible_name) if the operation
1767          suceeded with the mapping from node devices to instance devices
1768   """
1769   device_info = []
1770   disks_ok = True
1771   for inst_disk in instance.disks:
1772     master_result = None
1773     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1774       cfg.SetDiskID(node_disk, node)
1775       is_primary = node == instance.primary_node
1776       result = rpc.call_blockdev_assemble(node, node_disk,
1777                                           instance.name, is_primary)
1778       if not result:
1779         logger.Error("could not prepare block device %s on node %s (is_pri"
1780                      "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1781         if is_primary or not ignore_secondaries:
1782           disks_ok = False
1783       if is_primary:
1784         master_result = result
1785     device_info.append((instance.primary_node, inst_disk.iv_name,
1786                         master_result))
1787
1788   # leave the disks configured for the primary node
1789   # this is a workaround that would be fixed better by
1790   # improving the logical/physical id handling
1791   for disk in instance.disks:
1792     cfg.SetDiskID(disk, instance.primary_node)
1793
1794   return disks_ok, device_info
1795
1796
1797 def _StartInstanceDisks(cfg, instance, force):
1798   """Start the disks of an instance.
1799
1800   """
1801   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1802                                            ignore_secondaries=force)
1803   if not disks_ok:
1804     _ShutdownInstanceDisks(instance, cfg)
1805     if force is not None and not force:
1806       logger.Error("If the message above refers to a secondary node,"
1807                    " you can retry the operation using '--force'.")
1808     raise errors.OpExecError("Disk consistency error")
1809
1810
1811 class LUDeactivateInstanceDisks(NoHooksLU):
1812   """Shutdown an instance's disks.
1813
1814   """
1815   _OP_REQP = ["instance_name"]
1816
1817   def CheckPrereq(self):
1818     """Check prerequisites.
1819
1820     This checks that the instance is in the cluster.
1821
1822     """
1823     instance = self.cfg.GetInstanceInfo(
1824       self.cfg.ExpandInstanceName(self.op.instance_name))
1825     if instance is None:
1826       raise errors.OpPrereqError("Instance '%s' not known" %
1827                                  self.op.instance_name)
1828     self.instance = instance
1829
1830   def Exec(self, feedback_fn):
1831     """Deactivate the disks
1832
1833     """
1834     instance = self.instance
1835     ins_l = rpc.call_instance_list([instance.primary_node])
1836     ins_l = ins_l[instance.primary_node]
1837     if not type(ins_l) is list:
1838       raise errors.OpExecError("Can't contact node '%s'" %
1839                                instance.primary_node)
1840
1841     if self.instance.name in ins_l:
1842       raise errors.OpExecError("Instance is running, can't shutdown"
1843                                " block devices.")
1844
1845     _ShutdownInstanceDisks(instance, self.cfg)
1846
1847
1848 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1849   """Shutdown block devices of an instance.
1850
1851   This does the shutdown on all nodes of the instance.
1852
1853   If the ignore_primary is false, errors on the primary node are
1854   ignored.
1855
1856   """
1857   result = True
1858   for disk in instance.disks:
1859     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1860       cfg.SetDiskID(top_disk, node)
1861       if not rpc.call_blockdev_shutdown(node, top_disk):
1862         logger.Error("could not shutdown block device %s on node %s" %
1863                      (disk.iv_name, node))
1864         if not ignore_primary or node != instance.primary_node:
1865           result = False
1866   return result
1867
1868
1869 class LUStartupInstance(LogicalUnit):
1870   """Starts an instance.
1871
1872   """
1873   HPATH = "instance-start"
1874   HTYPE = constants.HTYPE_INSTANCE
1875   _OP_REQP = ["instance_name", "force"]
1876
1877   def BuildHooksEnv(self):
1878     """Build hooks env.
1879
1880     This runs on master, primary and secondary nodes of the instance.
1881
1882     """
1883     env = {
1884       "FORCE": self.op.force,
1885       }
1886     env.update(_BuildInstanceHookEnvByObject(self.instance))
1887     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1888           list(self.instance.secondary_nodes))
1889     return env, nl, nl
1890
1891   def CheckPrereq(self):
1892     """Check prerequisites.
1893
1894     This checks that the instance is in the cluster.
1895
1896     """
1897     instance = self.cfg.GetInstanceInfo(
1898       self.cfg.ExpandInstanceName(self.op.instance_name))
1899     if instance is None:
1900       raise errors.OpPrereqError("Instance '%s' not known" %
1901                                  self.op.instance_name)
1902
1903     # check bridges existance
1904     _CheckInstanceBridgesExist(instance)
1905
1906     self.instance = instance
1907     self.op.instance_name = instance.name
1908
1909   def Exec(self, feedback_fn):
1910     """Start the instance.
1911
1912     """
1913     instance = self.instance
1914     force = self.op.force
1915     extra_args = getattr(self.op, "extra_args", "")
1916
1917     node_current = instance.primary_node
1918
1919     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1920     if not nodeinfo:
1921       raise errors.OpExecError("Could not contact node %s for infos" %
1922                                (node_current))
1923
1924     freememory = nodeinfo[node_current]['memory_free']
1925     memory = instance.memory
1926     if memory > freememory:
1927       raise errors.OpExecError("Not enough memory to start instance"
1928                                " %s on node %s"
1929                                " needed %s MiB, available %s MiB" %
1930                                (instance.name, node_current, memory,
1931                                 freememory))
1932
1933     _StartInstanceDisks(self.cfg, instance, force)
1934
1935     if not rpc.call_instance_start(node_current, instance, extra_args):
1936       _ShutdownInstanceDisks(instance, self.cfg)
1937       raise errors.OpExecError("Could not start instance")
1938
1939     self.cfg.MarkInstanceUp(instance.name)
1940
1941
1942 class LURebootInstance(LogicalUnit):
1943   """Reboot an instance.
1944
1945   """
1946   HPATH = "instance-reboot"
1947   HTYPE = constants.HTYPE_INSTANCE
1948   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
1949
1950   def BuildHooksEnv(self):
1951     """Build hooks env.
1952
1953     This runs on master, primary and secondary nodes of the instance.
1954
1955     """
1956     env = {
1957       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1958       }
1959     env.update(_BuildInstanceHookEnvByObject(self.instance))
1960     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1961           list(self.instance.secondary_nodes))
1962     return env, nl, nl
1963
1964   def CheckPrereq(self):
1965     """Check prerequisites.
1966
1967     This checks that the instance is in the cluster.
1968
1969     """
1970     instance = self.cfg.GetInstanceInfo(
1971       self.cfg.ExpandInstanceName(self.op.instance_name))
1972     if instance is None:
1973       raise errors.OpPrereqError("Instance '%s' not known" %
1974                                  self.op.instance_name)
1975
1976     # check bridges existance
1977     _CheckInstanceBridgesExist(instance)
1978
1979     self.instance = instance
1980     self.op.instance_name = instance.name
1981
1982   def Exec(self, feedback_fn):
1983     """Reboot the instance.
1984
1985     """
1986     instance = self.instance
1987     ignore_secondaries = self.op.ignore_secondaries
1988     reboot_type = self.op.reboot_type
1989     extra_args = getattr(self.op, "extra_args", "")
1990
1991     node_current = instance.primary_node
1992
1993     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
1994                            constants.INSTANCE_REBOOT_HARD,
1995                            constants.INSTANCE_REBOOT_FULL]:
1996       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
1997                                   (constants.INSTANCE_REBOOT_SOFT,
1998                                    constants.INSTANCE_REBOOT_HARD,
1999                                    constants.INSTANCE_REBOOT_FULL))
2000
2001     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2002                        constants.INSTANCE_REBOOT_HARD]:
2003       if not rpc.call_instance_reboot(node_current, instance,
2004                                       reboot_type, extra_args):
2005         raise errors.OpExecError("Could not reboot instance")
2006     else:
2007       if not rpc.call_instance_shutdown(node_current, instance):
2008         raise errors.OpExecError("could not shutdown instance for full reboot")
2009       _ShutdownInstanceDisks(instance, self.cfg)
2010       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2011       if not rpc.call_instance_start(node_current, instance, extra_args):
2012         _ShutdownInstanceDisks(instance, self.cfg)
2013         raise errors.OpExecError("Could not start instance for full reboot")
2014
2015     self.cfg.MarkInstanceUp(instance.name)
2016
2017
2018 class LUShutdownInstance(LogicalUnit):
2019   """Shutdown an instance.
2020
2021   """
2022   HPATH = "instance-stop"
2023   HTYPE = constants.HTYPE_INSTANCE
2024   _OP_REQP = ["instance_name"]
2025
2026   def BuildHooksEnv(self):
2027     """Build hooks env.
2028
2029     This runs on master, primary and secondary nodes of the instance.
2030
2031     """
2032     env = _BuildInstanceHookEnvByObject(self.instance)
2033     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2034           list(self.instance.secondary_nodes))
2035     return env, nl, nl
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks that the instance is in the cluster.
2041
2042     """
2043     instance = self.cfg.GetInstanceInfo(
2044       self.cfg.ExpandInstanceName(self.op.instance_name))
2045     if instance is None:
2046       raise errors.OpPrereqError("Instance '%s' not known" %
2047                                  self.op.instance_name)
2048     self.instance = instance
2049
2050   def Exec(self, feedback_fn):
2051     """Shutdown the instance.
2052
2053     """
2054     instance = self.instance
2055     node_current = instance.primary_node
2056     if not rpc.call_instance_shutdown(node_current, instance):
2057       logger.Error("could not shutdown instance")
2058
2059     self.cfg.MarkInstanceDown(instance.name)
2060     _ShutdownInstanceDisks(instance, self.cfg)
2061
2062
2063 class LUReinstallInstance(LogicalUnit):
2064   """Reinstall an instance.
2065
2066   """
2067   HPATH = "instance-reinstall"
2068   HTYPE = constants.HTYPE_INSTANCE
2069   _OP_REQP = ["instance_name"]
2070
2071   def BuildHooksEnv(self):
2072     """Build hooks env.
2073
2074     This runs on master, primary and secondary nodes of the instance.
2075
2076     """
2077     env = _BuildInstanceHookEnvByObject(self.instance)
2078     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2079           list(self.instance.secondary_nodes))
2080     return env, nl, nl
2081
2082   def CheckPrereq(self):
2083     """Check prerequisites.
2084
2085     This checks that the instance is in the cluster and is not running.
2086
2087     """
2088     instance = self.cfg.GetInstanceInfo(
2089       self.cfg.ExpandInstanceName(self.op.instance_name))
2090     if instance is None:
2091       raise errors.OpPrereqError("Instance '%s' not known" %
2092                                  self.op.instance_name)
2093     if instance.disk_template == constants.DT_DISKLESS:
2094       raise errors.OpPrereqError("Instance '%s' has no disks" %
2095                                  self.op.instance_name)
2096     if instance.status != "down":
2097       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2098                                  self.op.instance_name)
2099     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2100     if remote_info:
2101       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2102                                  (self.op.instance_name,
2103                                   instance.primary_node))
2104
2105     self.op.os_type = getattr(self.op, "os_type", None)
2106     if self.op.os_type is not None:
2107       # OS verification
2108       pnode = self.cfg.GetNodeInfo(
2109         self.cfg.ExpandNodeName(instance.primary_node))
2110       if pnode is None:
2111         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2112                                    self.op.pnode)
2113       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2114       if not os_obj:
2115         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2116                                    " primary node"  % self.op.os_type)
2117
2118     self.instance = instance
2119
2120   def Exec(self, feedback_fn):
2121     """Reinstall the instance.
2122
2123     """
2124     inst = self.instance
2125
2126     if self.op.os_type is not None:
2127       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2128       inst.os = self.op.os_type
2129       self.cfg.AddInstance(inst)
2130
2131     _StartInstanceDisks(self.cfg, inst, None)
2132     try:
2133       feedback_fn("Running the instance OS create scripts...")
2134       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2135         raise errors.OpExecError("Could not install OS for instance %s "
2136                                  "on node %s" %
2137                                  (inst.name, inst.primary_node))
2138     finally:
2139       _ShutdownInstanceDisks(inst, self.cfg)
2140
2141
2142 class LURenameInstance(LogicalUnit):
2143   """Rename an instance.
2144
2145   """
2146   HPATH = "instance-rename"
2147   HTYPE = constants.HTYPE_INSTANCE
2148   _OP_REQP = ["instance_name", "new_name"]
2149
2150   def BuildHooksEnv(self):
2151     """Build hooks env.
2152
2153     This runs on master, primary and secondary nodes of the instance.
2154
2155     """
2156     env = _BuildInstanceHookEnvByObject(self.instance)
2157     env["INSTANCE_NEW_NAME"] = self.op.new_name
2158     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2159           list(self.instance.secondary_nodes))
2160     return env, nl, nl
2161
2162   def CheckPrereq(self):
2163     """Check prerequisites.
2164
2165     This checks that the instance is in the cluster and is not running.
2166
2167     """
2168     instance = self.cfg.GetInstanceInfo(
2169       self.cfg.ExpandInstanceName(self.op.instance_name))
2170     if instance is None:
2171       raise errors.OpPrereqError("Instance '%s' not known" %
2172                                  self.op.instance_name)
2173     if instance.status != "down":
2174       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2175                                  self.op.instance_name)
2176     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2177     if remote_info:
2178       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2179                                  (self.op.instance_name,
2180                                   instance.primary_node))
2181     self.instance = instance
2182
2183     # new name verification
2184     name_info = utils.HostInfo(self.op.new_name)
2185
2186     self.op.new_name = new_name = name_info.name
2187     if not getattr(self.op, "ignore_ip", False):
2188       command = ["fping", "-q", name_info.ip]
2189       result = utils.RunCmd(command)
2190       if not result.failed:
2191         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2192                                    (name_info.ip, new_name))
2193
2194
2195   def Exec(self, feedback_fn):
2196     """Reinstall the instance.
2197
2198     """
2199     inst = self.instance
2200     old_name = inst.name
2201
2202     self.cfg.RenameInstance(inst.name, self.op.new_name)
2203
2204     # re-read the instance from the configuration after rename
2205     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2206
2207     _StartInstanceDisks(self.cfg, inst, None)
2208     try:
2209       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2210                                           "sda", "sdb"):
2211         msg = ("Could run OS rename script for instance %s\n"
2212                "on node %s\n"
2213                "(but the instance has been renamed in Ganeti)" %
2214                (inst.name, inst.primary_node))
2215         logger.Error(msg)
2216     finally:
2217       _ShutdownInstanceDisks(inst, self.cfg)
2218
2219
2220 class LURemoveInstance(LogicalUnit):
2221   """Remove an instance.
2222
2223   """
2224   HPATH = "instance-remove"
2225   HTYPE = constants.HTYPE_INSTANCE
2226   _OP_REQP = ["instance_name"]
2227
2228   def BuildHooksEnv(self):
2229     """Build hooks env.
2230
2231     This runs on master, primary and secondary nodes of the instance.
2232
2233     """
2234     env = _BuildInstanceHookEnvByObject(self.instance)
2235     nl = [self.sstore.GetMasterNode()]
2236     return env, nl, nl
2237
2238   def CheckPrereq(self):
2239     """Check prerequisites.
2240
2241     This checks that the instance is in the cluster.
2242
2243     """
2244     instance = self.cfg.GetInstanceInfo(
2245       self.cfg.ExpandInstanceName(self.op.instance_name))
2246     if instance is None:
2247       raise errors.OpPrereqError("Instance '%s' not known" %
2248                                  self.op.instance_name)
2249     self.instance = instance
2250
2251   def Exec(self, feedback_fn):
2252     """Remove the instance.
2253
2254     """
2255     instance = self.instance
2256     logger.Info("shutting down instance %s on node %s" %
2257                 (instance.name, instance.primary_node))
2258
2259     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2260       if self.op.ignore_failures:
2261         feedback_fn("Warning: can't shutdown instance")
2262       else:
2263         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2264                                  (instance.name, instance.primary_node))
2265
2266     logger.Info("removing block devices for instance %s" % instance.name)
2267
2268     if not _RemoveDisks(instance, self.cfg):
2269       if self.op.ignore_failures:
2270         feedback_fn("Warning: can't remove instance's disks")
2271       else:
2272         raise errors.OpExecError("Can't remove instance's disks")
2273
2274     logger.Info("removing instance %s out of cluster config" % instance.name)
2275
2276     self.cfg.RemoveInstance(instance.name)
2277
2278
2279 class LUQueryInstances(NoHooksLU):
2280   """Logical unit for querying instances.
2281
2282   """
2283   _OP_REQP = ["output_fields", "names"]
2284
2285   def CheckPrereq(self):
2286     """Check prerequisites.
2287
2288     This checks that the fields required are valid output fields.
2289
2290     """
2291     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2292     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2293                                "admin_state", "admin_ram",
2294                                "disk_template", "ip", "mac", "bridge",
2295                                "sda_size", "sdb_size"],
2296                        dynamic=self.dynamic_fields,
2297                        selected=self.op.output_fields)
2298
2299     self.wanted = _GetWantedInstances(self, self.op.names)
2300
2301   def Exec(self, feedback_fn):
2302     """Computes the list of nodes and their attributes.
2303
2304     """
2305     instance_names = self.wanted
2306     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2307                      in instance_names]
2308
2309     # begin data gathering
2310
2311     nodes = frozenset([inst.primary_node for inst in instance_list])
2312
2313     bad_nodes = []
2314     if self.dynamic_fields.intersection(self.op.output_fields):
2315       live_data = {}
2316       node_data = rpc.call_all_instances_info(nodes)
2317       for name in nodes:
2318         result = node_data[name]
2319         if result:
2320           live_data.update(result)
2321         elif result == False:
2322           bad_nodes.append(name)
2323         # else no instance is alive
2324     else:
2325       live_data = dict([(name, {}) for name in instance_names])
2326
2327     # end data gathering
2328
2329     output = []
2330     for instance in instance_list:
2331       iout = []
2332       for field in self.op.output_fields:
2333         if field == "name":
2334           val = instance.name
2335         elif field == "os":
2336           val = instance.os
2337         elif field == "pnode":
2338           val = instance.primary_node
2339         elif field == "snodes":
2340           val = list(instance.secondary_nodes)
2341         elif field == "admin_state":
2342           val = (instance.status != "down")
2343         elif field == "oper_state":
2344           if instance.primary_node in bad_nodes:
2345             val = None
2346           else:
2347             val = bool(live_data.get(instance.name))
2348         elif field == "admin_ram":
2349           val = instance.memory
2350         elif field == "oper_ram":
2351           if instance.primary_node in bad_nodes:
2352             val = None
2353           elif instance.name in live_data:
2354             val = live_data[instance.name].get("memory", "?")
2355           else:
2356             val = "-"
2357         elif field == "disk_template":
2358           val = instance.disk_template
2359         elif field == "ip":
2360           val = instance.nics[0].ip
2361         elif field == "bridge":
2362           val = instance.nics[0].bridge
2363         elif field == "mac":
2364           val = instance.nics[0].mac
2365         elif field == "sda_size" or field == "sdb_size":
2366           disk = instance.FindDisk(field[:3])
2367           if disk is None:
2368             val = None
2369           else:
2370             val = disk.size
2371         else:
2372           raise errors.ParameterError(field)
2373         iout.append(val)
2374       output.append(iout)
2375
2376     return output
2377
2378
2379 class LUFailoverInstance(LogicalUnit):
2380   """Failover an instance.
2381
2382   """
2383   HPATH = "instance-failover"
2384   HTYPE = constants.HTYPE_INSTANCE
2385   _OP_REQP = ["instance_name", "ignore_consistency"]
2386
2387   def BuildHooksEnv(self):
2388     """Build hooks env.
2389
2390     This runs on master, primary and secondary nodes of the instance.
2391
2392     """
2393     env = {
2394       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2395       }
2396     env.update(_BuildInstanceHookEnvByObject(self.instance))
2397     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2398     return env, nl, nl
2399
2400   def CheckPrereq(self):
2401     """Check prerequisites.
2402
2403     This checks that the instance is in the cluster.
2404
2405     """
2406     instance = self.cfg.GetInstanceInfo(
2407       self.cfg.ExpandInstanceName(self.op.instance_name))
2408     if instance is None:
2409       raise errors.OpPrereqError("Instance '%s' not known" %
2410                                  self.op.instance_name)
2411
2412     if instance.disk_template not in constants.DTS_NET_MIRROR:
2413       raise errors.OpPrereqError("Instance's disk layout is not"
2414                                  " network mirrored, cannot failover.")
2415
2416     secondary_nodes = instance.secondary_nodes
2417     if not secondary_nodes:
2418       raise errors.ProgrammerError("no secondary node but using "
2419                                    "DT_REMOTE_RAID1 template")
2420
2421     # check memory requirements on the secondary node
2422     target_node = secondary_nodes[0]
2423     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2424     info = nodeinfo.get(target_node, None)
2425     if not info:
2426       raise errors.OpPrereqError("Cannot get current information"
2427                                  " from node '%s'" % nodeinfo)
2428     if instance.memory > info['memory_free']:
2429       raise errors.OpPrereqError("Not enough memory on target node %s."
2430                                  " %d MB available, %d MB required" %
2431                                  (target_node, info['memory_free'],
2432                                   instance.memory))
2433
2434     # check bridge existance
2435     brlist = [nic.bridge for nic in instance.nics]
2436     if not rpc.call_bridges_exist(target_node, brlist):
2437       raise errors.OpPrereqError("One or more target bridges %s does not"
2438                                  " exist on destination node '%s'" %
2439                                  (brlist, target_node))
2440
2441     self.instance = instance
2442
2443   def Exec(self, feedback_fn):
2444     """Failover an instance.
2445
2446     The failover is done by shutting it down on its present node and
2447     starting it on the secondary.
2448
2449     """
2450     instance = self.instance
2451
2452     source_node = instance.primary_node
2453     target_node = instance.secondary_nodes[0]
2454
2455     feedback_fn("* checking disk consistency between source and target")
2456     for dev in instance.disks:
2457       # for remote_raid1, these are md over drbd
2458       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2459         if not self.op.ignore_consistency:
2460           raise errors.OpExecError("Disk %s is degraded on target node,"
2461                                    " aborting failover." % dev.iv_name)
2462
2463     feedback_fn("* checking target node resource availability")
2464     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2465
2466     if not nodeinfo:
2467       raise errors.OpExecError("Could not contact target node %s." %
2468                                target_node)
2469
2470     free_memory = int(nodeinfo[target_node]['memory_free'])
2471     memory = instance.memory
2472     if memory > free_memory:
2473       raise errors.OpExecError("Not enough memory to create instance %s on"
2474                                " node %s. needed %s MiB, available %s MiB" %
2475                                (instance.name, target_node, memory,
2476                                 free_memory))
2477
2478     feedback_fn("* shutting down instance on source node")
2479     logger.Info("Shutting down instance %s on node %s" %
2480                 (instance.name, source_node))
2481
2482     if not rpc.call_instance_shutdown(source_node, instance):
2483       if self.op.ignore_consistency:
2484         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2485                      " anyway. Please make sure node %s is down"  %
2486                      (instance.name, source_node, source_node))
2487       else:
2488         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2489                                  (instance.name, source_node))
2490
2491     feedback_fn("* deactivating the instance's disks on source node")
2492     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2493       raise errors.OpExecError("Can't shut down the instance's disks.")
2494
2495     instance.primary_node = target_node
2496     # distribute new instance config to the other nodes
2497     self.cfg.AddInstance(instance)
2498
2499     feedback_fn("* activating the instance's disks on target node")
2500     logger.Info("Starting instance %s on node %s" %
2501                 (instance.name, target_node))
2502
2503     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2504                                              ignore_secondaries=True)
2505     if not disks_ok:
2506       _ShutdownInstanceDisks(instance, self.cfg)
2507       raise errors.OpExecError("Can't activate the instance's disks")
2508
2509     feedback_fn("* starting the instance on the target node")
2510     if not rpc.call_instance_start(target_node, instance, None):
2511       _ShutdownInstanceDisks(instance, self.cfg)
2512       raise errors.OpExecError("Could not start instance %s on node %s." %
2513                                (instance.name, target_node))
2514
2515
2516 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2517   """Create a tree of block devices on the primary node.
2518
2519   This always creates all devices.
2520
2521   """
2522   if device.children:
2523     for child in device.children:
2524       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2525         return False
2526
2527   cfg.SetDiskID(device, node)
2528   new_id = rpc.call_blockdev_create(node, device, device.size,
2529                                     instance.name, True, info)
2530   if not new_id:
2531     return False
2532   if device.physical_id is None:
2533     device.physical_id = new_id
2534   return True
2535
2536
2537 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2538   """Create a tree of block devices on a secondary node.
2539
2540   If this device type has to be created on secondaries, create it and
2541   all its children.
2542
2543   If not, just recurse to children keeping the same 'force' value.
2544
2545   """
2546   if device.CreateOnSecondary():
2547     force = True
2548   if device.children:
2549     for child in device.children:
2550       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2551                                         child, force, info):
2552         return False
2553
2554   if not force:
2555     return True
2556   cfg.SetDiskID(device, node)
2557   new_id = rpc.call_blockdev_create(node, device, device.size,
2558                                     instance.name, False, info)
2559   if not new_id:
2560     return False
2561   if device.physical_id is None:
2562     device.physical_id = new_id
2563   return True
2564
2565
2566 def _GenerateUniqueNames(cfg, exts):
2567   """Generate a suitable LV name.
2568
2569   This will generate a logical volume name for the given instance.
2570
2571   """
2572   results = []
2573   for val in exts:
2574     new_id = cfg.GenerateUniqueID()
2575     results.append("%s%s" % (new_id, val))
2576   return results
2577
2578
2579 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2580   """Generate a drbd device complete with its children.
2581
2582   """
2583   port = cfg.AllocatePort()
2584   vgname = cfg.GetVGName()
2585   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2586                           logical_id=(vgname, names[0]))
2587   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2588                           logical_id=(vgname, names[1]))
2589   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2590                           logical_id = (primary, secondary, port),
2591                           children = [dev_data, dev_meta])
2592   return drbd_dev
2593
2594
2595 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2596   """Generate a drbd8 device complete with its children.
2597
2598   """
2599   port = cfg.AllocatePort()
2600   vgname = cfg.GetVGName()
2601   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2602                           logical_id=(vgname, names[0]))
2603   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2604                           logical_id=(vgname, names[1]))
2605   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2606                           logical_id = (primary, secondary, port),
2607                           children = [dev_data, dev_meta],
2608                           iv_name=iv_name)
2609   return drbd_dev
2610
2611 def _GenerateDiskTemplate(cfg, template_name,
2612                           instance_name, primary_node,
2613                           secondary_nodes, disk_sz, swap_sz):
2614   """Generate the entire disk layout for a given template type.
2615
2616   """
2617   #TODO: compute space requirements
2618
2619   vgname = cfg.GetVGName()
2620   if template_name == "diskless":
2621     disks = []
2622   elif template_name == "plain":
2623     if len(secondary_nodes) != 0:
2624       raise errors.ProgrammerError("Wrong template configuration")
2625
2626     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2627     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2628                            logical_id=(vgname, names[0]),
2629                            iv_name = "sda")
2630     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2631                            logical_id=(vgname, names[1]),
2632                            iv_name = "sdb")
2633     disks = [sda_dev, sdb_dev]
2634   elif template_name == "local_raid1":
2635     if len(secondary_nodes) != 0:
2636       raise errors.ProgrammerError("Wrong template configuration")
2637
2638
2639     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2640                                        ".sdb_m1", ".sdb_m2"])
2641     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2642                               logical_id=(vgname, names[0]))
2643     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2644                               logical_id=(vgname, names[1]))
2645     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2646                               size=disk_sz,
2647                               children = [sda_dev_m1, sda_dev_m2])
2648     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2649                               logical_id=(vgname, names[2]))
2650     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2651                               logical_id=(vgname, names[3]))
2652     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2653                               size=swap_sz,
2654                               children = [sdb_dev_m1, sdb_dev_m2])
2655     disks = [md_sda_dev, md_sdb_dev]
2656   elif template_name == constants.DT_REMOTE_RAID1:
2657     if len(secondary_nodes) != 1:
2658       raise errors.ProgrammerError("Wrong template configuration")
2659     remote_node = secondary_nodes[0]
2660     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2661                                        ".sdb_data", ".sdb_meta"])
2662     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2663                                          disk_sz, names[0:2])
2664     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2665                               children = [drbd_sda_dev], size=disk_sz)
2666     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2667                                          swap_sz, names[2:4])
2668     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2669                               children = [drbd_sdb_dev], size=swap_sz)
2670     disks = [md_sda_dev, md_sdb_dev]
2671   elif template_name == constants.DT_DRBD8:
2672     if len(secondary_nodes) != 1:
2673       raise errors.ProgrammerError("Wrong template configuration")
2674     remote_node = secondary_nodes[0]
2675     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2676                                        ".sdb_data", ".sdb_meta"])
2677     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2678                                          disk_sz, names[0:2], "sda")
2679     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2680                                          swap_sz, names[2:4], "sdb")
2681     disks = [drbd_sda_dev, drbd_sdb_dev]
2682   else:
2683     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2684   return disks
2685
2686
2687 def _GetInstanceInfoText(instance):
2688   """Compute that text that should be added to the disk's metadata.
2689
2690   """
2691   return "originstname+%s" % instance.name
2692
2693
2694 def _CreateDisks(cfg, instance):
2695   """Create all disks for an instance.
2696
2697   This abstracts away some work from AddInstance.
2698
2699   Args:
2700     instance: the instance object
2701
2702   Returns:
2703     True or False showing the success of the creation process
2704
2705   """
2706   info = _GetInstanceInfoText(instance)
2707
2708   for device in instance.disks:
2709     logger.Info("creating volume %s for instance %s" %
2710               (device.iv_name, instance.name))
2711     #HARDCODE
2712     for secondary_node in instance.secondary_nodes:
2713       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2714                                         device, False, info):
2715         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2716                      (device.iv_name, device, secondary_node))
2717         return False
2718     #HARDCODE
2719     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2720                                     instance, device, info):
2721       logger.Error("failed to create volume %s on primary!" %
2722                    device.iv_name)
2723       return False
2724   return True
2725
2726
2727 def _RemoveDisks(instance, cfg):
2728   """Remove all disks for an instance.
2729
2730   This abstracts away some work from `AddInstance()` and
2731   `RemoveInstance()`. Note that in case some of the devices couldn't
2732   be removed, the removal will continue with the other ones (compare
2733   with `_CreateDisks()`).
2734
2735   Args:
2736     instance: the instance object
2737
2738   Returns:
2739     True or False showing the success of the removal proces
2740
2741   """
2742   logger.Info("removing block devices for instance %s" % instance.name)
2743
2744   result = True
2745   for device in instance.disks:
2746     for node, disk in device.ComputeNodeTree(instance.primary_node):
2747       cfg.SetDiskID(disk, node)
2748       if not rpc.call_blockdev_remove(node, disk):
2749         logger.Error("could not remove block device %s on node %s,"
2750                      " continuing anyway" %
2751                      (device.iv_name, node))
2752         result = False
2753   return result
2754
2755
2756 class LUCreateInstance(LogicalUnit):
2757   """Create an instance.
2758
2759   """
2760   HPATH = "instance-add"
2761   HTYPE = constants.HTYPE_INSTANCE
2762   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2763               "disk_template", "swap_size", "mode", "start", "vcpus",
2764               "wait_for_sync", "ip_check"]
2765
2766   def BuildHooksEnv(self):
2767     """Build hooks env.
2768
2769     This runs on master, primary and secondary nodes of the instance.
2770
2771     """
2772     env = {
2773       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2774       "INSTANCE_DISK_SIZE": self.op.disk_size,
2775       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2776       "INSTANCE_ADD_MODE": self.op.mode,
2777       }
2778     if self.op.mode == constants.INSTANCE_IMPORT:
2779       env["INSTANCE_SRC_NODE"] = self.op.src_node
2780       env["INSTANCE_SRC_PATH"] = self.op.src_path
2781       env["INSTANCE_SRC_IMAGE"] = self.src_image
2782
2783     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2784       primary_node=self.op.pnode,
2785       secondary_nodes=self.secondaries,
2786       status=self.instance_status,
2787       os_type=self.op.os_type,
2788       memory=self.op.mem_size,
2789       vcpus=self.op.vcpus,
2790       nics=[(self.inst_ip, self.op.bridge)],
2791     ))
2792
2793     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2794           self.secondaries)
2795     return env, nl, nl
2796
2797
2798   def CheckPrereq(self):
2799     """Check prerequisites.
2800
2801     """
2802     if self.op.mode not in (constants.INSTANCE_CREATE,
2803                             constants.INSTANCE_IMPORT):
2804       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2805                                  self.op.mode)
2806
2807     if self.op.mode == constants.INSTANCE_IMPORT:
2808       src_node = getattr(self.op, "src_node", None)
2809       src_path = getattr(self.op, "src_path", None)
2810       if src_node is None or src_path is None:
2811         raise errors.OpPrereqError("Importing an instance requires source"
2812                                    " node and path options")
2813       src_node_full = self.cfg.ExpandNodeName(src_node)
2814       if src_node_full is None:
2815         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2816       self.op.src_node = src_node = src_node_full
2817
2818       if not os.path.isabs(src_path):
2819         raise errors.OpPrereqError("The source path must be absolute")
2820
2821       export_info = rpc.call_export_info(src_node, src_path)
2822
2823       if not export_info:
2824         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2825
2826       if not export_info.has_section(constants.INISECT_EXP):
2827         raise errors.ProgrammerError("Corrupted export config")
2828
2829       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2830       if (int(ei_version) != constants.EXPORT_VERSION):
2831         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2832                                    (ei_version, constants.EXPORT_VERSION))
2833
2834       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2835         raise errors.OpPrereqError("Can't import instance with more than"
2836                                    " one data disk")
2837
2838       # FIXME: are the old os-es, disk sizes, etc. useful?
2839       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2840       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2841                                                          'disk0_dump'))
2842       self.src_image = diskimage
2843     else: # INSTANCE_CREATE
2844       if getattr(self.op, "os_type", None) is None:
2845         raise errors.OpPrereqError("No guest OS specified")
2846
2847     # check primary node
2848     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2849     if pnode is None:
2850       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2851                                  self.op.pnode)
2852     self.op.pnode = pnode.name
2853     self.pnode = pnode
2854     self.secondaries = []
2855     # disk template and mirror node verification
2856     if self.op.disk_template not in constants.DISK_TEMPLATES:
2857       raise errors.OpPrereqError("Invalid disk template name")
2858
2859     if self.op.disk_template in constants.DTS_NET_MIRROR:
2860       if getattr(self.op, "snode", None) is None:
2861         raise errors.OpPrereqError("The networked disk templates need"
2862                                    " a mirror node")
2863
2864       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2865       if snode_name is None:
2866         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2867                                    self.op.snode)
2868       elif snode_name == pnode.name:
2869         raise errors.OpPrereqError("The secondary node cannot be"
2870                                    " the primary node.")
2871       self.secondaries.append(snode_name)
2872
2873     # Check lv size requirements
2874     nodenames = [pnode.name] + self.secondaries
2875     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2876
2877     # Required free disk space as a function of disk and swap space
2878     req_size_dict = {
2879       constants.DT_DISKLESS: 0,
2880       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2881       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2882       # 256 MB are added for drbd metadata, 128MB for each drbd device
2883       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2884       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2885     }
2886
2887     if self.op.disk_template not in req_size_dict:
2888       raise errors.ProgrammerError("Disk template '%s' size requirement"
2889                                    " is unknown" %  self.op.disk_template)
2890
2891     req_size = req_size_dict[self.op.disk_template]
2892
2893     for node in nodenames:
2894       info = nodeinfo.get(node, None)
2895       if not info:
2896         raise errors.OpPrereqError("Cannot get current information"
2897                                    " from node '%s'" % nodeinfo)
2898       if req_size > info['vg_free']:
2899         raise errors.OpPrereqError("Not enough disk space on target node %s."
2900                                    " %d MB available, %d MB required" %
2901                                    (node, info['vg_free'], req_size))
2902
2903     # os verification
2904     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2905     if not os_obj:
2906       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2907                                  " primary node"  % self.op.os_type)
2908
2909     # instance verification
2910     hostname1 = utils.HostInfo(self.op.instance_name)
2911
2912     self.op.instance_name = instance_name = hostname1.name
2913     instance_list = self.cfg.GetInstanceList()
2914     if instance_name in instance_list:
2915       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2916                                  instance_name)
2917
2918     ip = getattr(self.op, "ip", None)
2919     if ip is None or ip.lower() == "none":
2920       inst_ip = None
2921     elif ip.lower() == "auto":
2922       inst_ip = hostname1.ip
2923     else:
2924       if not utils.IsValidIP(ip):
2925         raise errors.OpPrereqError("given IP address '%s' doesn't look"
2926                                    " like a valid IP" % ip)
2927       inst_ip = ip
2928     self.inst_ip = inst_ip
2929
2930     if self.op.start and not self.op.ip_check:
2931       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
2932                                  " adding an instance in start mode")
2933
2934     if self.op.ip_check:
2935       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
2936                        constants.DEFAULT_NODED_PORT):
2937         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2938                                    (hostname1.ip, instance_name))
2939
2940     # bridge verification
2941     bridge = getattr(self.op, "bridge", None)
2942     if bridge is None:
2943       self.op.bridge = self.cfg.GetDefBridge()
2944     else:
2945       self.op.bridge = bridge
2946
2947     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2948       raise errors.OpPrereqError("target bridge '%s' does not exist on"
2949                                  " destination node '%s'" %
2950                                  (self.op.bridge, pnode.name))
2951
2952     if self.op.start:
2953       self.instance_status = 'up'
2954     else:
2955       self.instance_status = 'down'
2956
2957   def Exec(self, feedback_fn):
2958     """Create and add the instance to the cluster.
2959
2960     """
2961     instance = self.op.instance_name
2962     pnode_name = self.pnode.name
2963
2964     nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2965     if self.inst_ip is not None:
2966       nic.ip = self.inst_ip
2967
2968     disks = _GenerateDiskTemplate(self.cfg,
2969                                   self.op.disk_template,
2970                                   instance, pnode_name,
2971                                   self.secondaries, self.op.disk_size,
2972                                   self.op.swap_size)
2973
2974     iobj = objects.Instance(name=instance, os=self.op.os_type,
2975                             primary_node=pnode_name,
2976                             memory=self.op.mem_size,
2977                             vcpus=self.op.vcpus,
2978                             nics=[nic], disks=disks,
2979                             disk_template=self.op.disk_template,
2980                             status=self.instance_status,
2981                             )
2982
2983     feedback_fn("* creating instance disks...")
2984     if not _CreateDisks(self.cfg, iobj):
2985       _RemoveDisks(iobj, self.cfg)
2986       raise errors.OpExecError("Device creation failed, reverting...")
2987
2988     feedback_fn("adding instance %s to cluster config" % instance)
2989
2990     self.cfg.AddInstance(iobj)
2991
2992     if self.op.wait_for_sync:
2993       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
2994     elif iobj.disk_template in constants.DTS_NET_MIRROR:
2995       # make sure the disks are not degraded (still sync-ing is ok)
2996       time.sleep(15)
2997       feedback_fn("* checking mirrors status")
2998       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
2999     else:
3000       disk_abort = False
3001
3002     if disk_abort:
3003       _RemoveDisks(iobj, self.cfg)
3004       self.cfg.RemoveInstance(iobj.name)
3005       raise errors.OpExecError("There are some degraded disks for"
3006                                " this instance")
3007
3008     feedback_fn("creating os for instance %s on node %s" %
3009                 (instance, pnode_name))
3010
3011     if iobj.disk_template != constants.DT_DISKLESS:
3012       if self.op.mode == constants.INSTANCE_CREATE:
3013         feedback_fn("* running the instance OS create scripts...")
3014         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3015           raise errors.OpExecError("could not add os for instance %s"
3016                                    " on node %s" %
3017                                    (instance, pnode_name))
3018
3019       elif self.op.mode == constants.INSTANCE_IMPORT:
3020         feedback_fn("* running the instance OS import scripts...")
3021         src_node = self.op.src_node
3022         src_image = self.src_image
3023         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3024                                                 src_node, src_image):
3025           raise errors.OpExecError("Could not import os for instance"
3026                                    " %s on node %s" %
3027                                    (instance, pnode_name))
3028       else:
3029         # also checked in the prereq part
3030         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3031                                      % self.op.mode)
3032
3033     if self.op.start:
3034       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3035       feedback_fn("* starting instance...")
3036       if not rpc.call_instance_start(pnode_name, iobj, None):
3037         raise errors.OpExecError("Could not start instance")
3038
3039
3040 class LUConnectConsole(NoHooksLU):
3041   """Connect to an instance's console.
3042
3043   This is somewhat special in that it returns the command line that
3044   you need to run on the master node in order to connect to the
3045   console.
3046
3047   """
3048   _OP_REQP = ["instance_name"]
3049
3050   def CheckPrereq(self):
3051     """Check prerequisites.
3052
3053     This checks that the instance is in the cluster.
3054
3055     """
3056     instance = self.cfg.GetInstanceInfo(
3057       self.cfg.ExpandInstanceName(self.op.instance_name))
3058     if instance is None:
3059       raise errors.OpPrereqError("Instance '%s' not known" %
3060                                  self.op.instance_name)
3061     self.instance = instance
3062
3063   def Exec(self, feedback_fn):
3064     """Connect to the console of an instance
3065
3066     """
3067     instance = self.instance
3068     node = instance.primary_node
3069
3070     node_insts = rpc.call_instance_list([node])[node]
3071     if node_insts is False:
3072       raise errors.OpExecError("Can't connect to node %s." % node)
3073
3074     if instance.name not in node_insts:
3075       raise errors.OpExecError("Instance %s is not running." % instance.name)
3076
3077     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3078
3079     hyper = hypervisor.GetHypervisor()
3080     console_cmd = hyper.GetShellCommandForConsole(instance.name)
3081     # build ssh cmdline
3082     argv = ["ssh", "-q", "-t"]
3083     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3084     argv.extend(ssh.BATCH_MODE_OPTS)
3085     argv.append(node)
3086     argv.append(console_cmd)
3087     return "ssh", argv
3088
3089
3090 class LUAddMDDRBDComponent(LogicalUnit):
3091   """Adda new mirror member to an instance's disk.
3092
3093   """
3094   HPATH = "mirror-add"
3095   HTYPE = constants.HTYPE_INSTANCE
3096   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3097
3098   def BuildHooksEnv(self):
3099     """Build hooks env.
3100
3101     This runs on the master, the primary and all the secondaries.
3102
3103     """
3104     env = {
3105       "NEW_SECONDARY": self.op.remote_node,
3106       "DISK_NAME": self.op.disk_name,
3107       }
3108     env.update(_BuildInstanceHookEnvByObject(self.instance))
3109     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3110           self.op.remote_node,] + list(self.instance.secondary_nodes)
3111     return env, nl, nl
3112
3113   def CheckPrereq(self):
3114     """Check prerequisites.
3115
3116     This checks that the instance is in the cluster.
3117
3118     """
3119     instance = self.cfg.GetInstanceInfo(
3120       self.cfg.ExpandInstanceName(self.op.instance_name))
3121     if instance is None:
3122       raise errors.OpPrereqError("Instance '%s' not known" %
3123                                  self.op.instance_name)
3124     self.instance = instance
3125
3126     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3127     if remote_node is None:
3128       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3129     self.remote_node = remote_node
3130
3131     if remote_node == instance.primary_node:
3132       raise errors.OpPrereqError("The specified node is the primary node of"
3133                                  " the instance.")
3134
3135     if instance.disk_template != constants.DT_REMOTE_RAID1:
3136       raise errors.OpPrereqError("Instance's disk layout is not"
3137                                  " remote_raid1.")
3138     for disk in instance.disks:
3139       if disk.iv_name == self.op.disk_name:
3140         break
3141     else:
3142       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3143                                  " instance." % self.op.disk_name)
3144     if len(disk.children) > 1:
3145       raise errors.OpPrereqError("The device already has two slave"
3146                                  " devices.\n"
3147                                  "This would create a 3-disk raid1"
3148                                  " which we don't allow.")
3149     self.disk = disk
3150
3151   def Exec(self, feedback_fn):
3152     """Add the mirror component
3153
3154     """
3155     disk = self.disk
3156     instance = self.instance
3157
3158     remote_node = self.remote_node
3159     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3160     names = _GenerateUniqueNames(self.cfg, lv_names)
3161     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3162                                      remote_node, disk.size, names)
3163
3164     logger.Info("adding new mirror component on secondary")
3165     #HARDCODE
3166     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3167                                       new_drbd, False,
3168                                       _GetInstanceInfoText(instance)):
3169       raise errors.OpExecError("Failed to create new component on secondary"
3170                                " node %s" % remote_node)
3171
3172     logger.Info("adding new mirror component on primary")
3173     #HARDCODE
3174     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3175                                     instance, new_drbd,
3176                                     _GetInstanceInfoText(instance)):
3177       # remove secondary dev
3178       self.cfg.SetDiskID(new_drbd, remote_node)
3179       rpc.call_blockdev_remove(remote_node, new_drbd)
3180       raise errors.OpExecError("Failed to create volume on primary")
3181
3182     # the device exists now
3183     # call the primary node to add the mirror to md
3184     logger.Info("adding new mirror component to md")
3185     if not rpc.call_blockdev_addchildren(instance.primary_node,
3186                                          disk, [new_drbd]):
3187       logger.Error("Can't add mirror compoment to md!")
3188       self.cfg.SetDiskID(new_drbd, remote_node)
3189       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3190         logger.Error("Can't rollback on secondary")
3191       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3192       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3193         logger.Error("Can't rollback on primary")
3194       raise errors.OpExecError("Can't add mirror component to md array")
3195
3196     disk.children.append(new_drbd)
3197
3198     self.cfg.AddInstance(instance)
3199
3200     _WaitForSync(self.cfg, instance, self.proc)
3201
3202     return 0
3203
3204
3205 class LURemoveMDDRBDComponent(LogicalUnit):
3206   """Remove a component from a remote_raid1 disk.
3207
3208   """
3209   HPATH = "mirror-remove"
3210   HTYPE = constants.HTYPE_INSTANCE
3211   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3212
3213   def BuildHooksEnv(self):
3214     """Build hooks env.
3215
3216     This runs on the master, the primary and all the secondaries.
3217
3218     """
3219     env = {
3220       "DISK_NAME": self.op.disk_name,
3221       "DISK_ID": self.op.disk_id,
3222       "OLD_SECONDARY": self.old_secondary,
3223       }
3224     env.update(_BuildInstanceHookEnvByObject(self.instance))
3225     nl = [self.sstore.GetMasterNode(),
3226           self.instance.primary_node] + list(self.instance.secondary_nodes)
3227     return env, nl, nl
3228
3229   def CheckPrereq(self):
3230     """Check prerequisites.
3231
3232     This checks that the instance is in the cluster.
3233
3234     """
3235     instance = self.cfg.GetInstanceInfo(
3236       self.cfg.ExpandInstanceName(self.op.instance_name))
3237     if instance is None:
3238       raise errors.OpPrereqError("Instance '%s' not known" %
3239                                  self.op.instance_name)
3240     self.instance = instance
3241
3242     if instance.disk_template != constants.DT_REMOTE_RAID1:
3243       raise errors.OpPrereqError("Instance's disk layout is not"
3244                                  " remote_raid1.")
3245     for disk in instance.disks:
3246       if disk.iv_name == self.op.disk_name:
3247         break
3248     else:
3249       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3250                                  " instance." % self.op.disk_name)
3251     for child in disk.children:
3252       if (child.dev_type == constants.LD_DRBD7 and
3253           child.logical_id[2] == self.op.disk_id):
3254         break
3255     else:
3256       raise errors.OpPrereqError("Can't find the device with this port.")
3257
3258     if len(disk.children) < 2:
3259       raise errors.OpPrereqError("Cannot remove the last component from"
3260                                  " a mirror.")
3261     self.disk = disk
3262     self.child = child
3263     if self.child.logical_id[0] == instance.primary_node:
3264       oid = 1
3265     else:
3266       oid = 0
3267     self.old_secondary = self.child.logical_id[oid]
3268
3269   def Exec(self, feedback_fn):
3270     """Remove the mirror component
3271
3272     """
3273     instance = self.instance
3274     disk = self.disk
3275     child = self.child
3276     logger.Info("remove mirror component")
3277     self.cfg.SetDiskID(disk, instance.primary_node)
3278     if not rpc.call_blockdev_removechildren(instance.primary_node,
3279                                             disk, [child]):
3280       raise errors.OpExecError("Can't remove child from mirror.")
3281
3282     for node in child.logical_id[:2]:
3283       self.cfg.SetDiskID(child, node)
3284       if not rpc.call_blockdev_remove(node, child):
3285         logger.Error("Warning: failed to remove device from node %s,"
3286                      " continuing operation." % node)
3287
3288     disk.children.remove(child)
3289     self.cfg.AddInstance(instance)
3290
3291
3292 class LUReplaceDisks(LogicalUnit):
3293   """Replace the disks of an instance.
3294
3295   """
3296   HPATH = "mirrors-replace"
3297   HTYPE = constants.HTYPE_INSTANCE
3298   _OP_REQP = ["instance_name", "mode", "disks"]
3299
3300   def BuildHooksEnv(self):
3301     """Build hooks env.
3302
3303     This runs on the master, the primary and all the secondaries.
3304
3305     """
3306     env = {
3307       "MODE": self.op.mode,
3308       "NEW_SECONDARY": self.op.remote_node,
3309       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3310       }
3311     env.update(_BuildInstanceHookEnvByObject(self.instance))
3312     nl = [
3313       self.sstore.GetMasterNode(),
3314       self.instance.primary_node,
3315       ]
3316     if self.op.remote_node is not None:
3317       nl.append(self.op.remote_node)
3318     return env, nl, nl
3319
3320   def CheckPrereq(self):
3321     """Check prerequisites.
3322
3323     This checks that the instance is in the cluster.
3324
3325     """
3326     instance = self.cfg.GetInstanceInfo(
3327       self.cfg.ExpandInstanceName(self.op.instance_name))
3328     if instance is None:
3329       raise errors.OpPrereqError("Instance '%s' not known" %
3330                                  self.op.instance_name)
3331     self.instance = instance
3332     self.op.instance_name = instance.name
3333
3334     if instance.disk_template not in constants.DTS_NET_MIRROR:
3335       raise errors.OpPrereqError("Instance's disk layout is not"
3336                                  " network mirrored.")
3337
3338     if len(instance.secondary_nodes) != 1:
3339       raise errors.OpPrereqError("The instance has a strange layout,"
3340                                  " expected one secondary but found %d" %
3341                                  len(instance.secondary_nodes))
3342
3343     self.sec_node = instance.secondary_nodes[0]
3344
3345     remote_node = getattr(self.op, "remote_node", None)
3346     if remote_node is not None:
3347       remote_node = self.cfg.ExpandNodeName(remote_node)
3348       if remote_node is None:
3349         raise errors.OpPrereqError("Node '%s' not known" %
3350                                    self.op.remote_node)
3351       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3352     else:
3353       self.remote_node_info = None
3354     if remote_node == instance.primary_node:
3355       raise errors.OpPrereqError("The specified node is the primary node of"
3356                                  " the instance.")
3357     elif remote_node == self.sec_node:
3358       if self.op.mode == constants.REPLACE_DISK_SEC:
3359         # this is for DRBD8, where we can't execute the same mode of
3360         # replacement as for drbd7 (no different port allocated)
3361         raise errors.OpPrereqError("Same secondary given, cannot execute"
3362                                    " replacement")
3363       # the user gave the current secondary, switch to
3364       # 'no-replace-secondary' mode for drbd7
3365       remote_node = None
3366     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3367         self.op.mode != constants.REPLACE_DISK_ALL):
3368       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3369                                  " disks replacement, not individual ones")
3370     if instance.disk_template == constants.DT_DRBD8:
3371       if (self.op.mode == constants.REPLACE_DISK_ALL and
3372           remote_node is not None):
3373         # switch to replace secondary mode
3374         self.op.mode = constants.REPLACE_DISK_SEC
3375
3376       if self.op.mode == constants.REPLACE_DISK_ALL:
3377         raise errors.OpPrereqError("Template 'drbd8' only allows primary or"
3378                                    " secondary disk replacement, not"
3379                                    " both at once")
3380       elif self.op.mode == constants.REPLACE_DISK_PRI:
3381         if remote_node is not None:
3382           raise errors.OpPrereqError("Template 'drbd8' does not allow changing"
3383                                      " the secondary while doing a primary"
3384                                      " node disk replacement")
3385         self.tgt_node = instance.primary_node
3386         self.oth_node = instance.secondary_nodes[0]
3387       elif self.op.mode == constants.REPLACE_DISK_SEC:
3388         self.new_node = remote_node # this can be None, in which case
3389                                     # we don't change the secondary
3390         self.tgt_node = instance.secondary_nodes[0]
3391         self.oth_node = instance.primary_node
3392       else:
3393         raise errors.ProgrammerError("Unhandled disk replace mode")
3394
3395     for name in self.op.disks:
3396       if instance.FindDisk(name) is None:
3397         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3398                                    (name, instance.name))
3399     self.op.remote_node = remote_node
3400
3401   def _ExecRR1(self, feedback_fn):
3402     """Replace the disks of an instance.
3403
3404     """
3405     instance = self.instance
3406     iv_names = {}
3407     # start of work
3408     if self.op.remote_node is None:
3409       remote_node = self.sec_node
3410     else:
3411       remote_node = self.op.remote_node
3412     cfg = self.cfg
3413     for dev in instance.disks:
3414       size = dev.size
3415       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3416       names = _GenerateUniqueNames(cfg, lv_names)
3417       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3418                                        remote_node, size, names)
3419       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3420       logger.Info("adding new mirror component on secondary for %s" %
3421                   dev.iv_name)
3422       #HARDCODE
3423       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3424                                         new_drbd, False,
3425                                         _GetInstanceInfoText(instance)):
3426         raise errors.OpExecError("Failed to create new component on"
3427                                  " secondary node %s\n"
3428                                  "Full abort, cleanup manually!" %
3429                                  remote_node)
3430
3431       logger.Info("adding new mirror component on primary")
3432       #HARDCODE
3433       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3434                                       instance, new_drbd,
3435                                       _GetInstanceInfoText(instance)):
3436         # remove secondary dev
3437         cfg.SetDiskID(new_drbd, remote_node)
3438         rpc.call_blockdev_remove(remote_node, new_drbd)
3439         raise errors.OpExecError("Failed to create volume on primary!\n"
3440                                  "Full abort, cleanup manually!!")
3441
3442       # the device exists now
3443       # call the primary node to add the mirror to md
3444       logger.Info("adding new mirror component to md")
3445       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3446                                            [new_drbd]):
3447         logger.Error("Can't add mirror compoment to md!")
3448         cfg.SetDiskID(new_drbd, remote_node)
3449         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3450           logger.Error("Can't rollback on secondary")
3451         cfg.SetDiskID(new_drbd, instance.primary_node)
3452         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3453           logger.Error("Can't rollback on primary")
3454         raise errors.OpExecError("Full abort, cleanup manually!!")
3455
3456       dev.children.append(new_drbd)
3457       cfg.AddInstance(instance)
3458
3459     # this can fail as the old devices are degraded and _WaitForSync
3460     # does a combined result over all disks, so we don't check its
3461     # return value
3462     _WaitForSync(cfg, instance, self.proc, unlock=True)
3463
3464     # so check manually all the devices
3465     for name in iv_names:
3466       dev, child, new_drbd = iv_names[name]
3467       cfg.SetDiskID(dev, instance.primary_node)
3468       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3469       if is_degr:
3470         raise errors.OpExecError("MD device %s is degraded!" % name)
3471       cfg.SetDiskID(new_drbd, instance.primary_node)
3472       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3473       if is_degr:
3474         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3475
3476     for name in iv_names:
3477       dev, child, new_drbd = iv_names[name]
3478       logger.Info("remove mirror %s component" % name)
3479       cfg.SetDiskID(dev, instance.primary_node)
3480       if not rpc.call_blockdev_removechildren(instance.primary_node,
3481                                               dev, [child]):
3482         logger.Error("Can't remove child from mirror, aborting"
3483                      " *this device cleanup*.\nYou need to cleanup manually!!")
3484         continue
3485
3486       for node in child.logical_id[:2]:
3487         logger.Info("remove child device on %s" % node)
3488         cfg.SetDiskID(child, node)
3489         if not rpc.call_blockdev_remove(node, child):
3490           logger.Error("Warning: failed to remove device from node %s,"
3491                        " continuing operation." % node)
3492
3493       dev.children.remove(child)
3494
3495       cfg.AddInstance(instance)
3496
3497   def _ExecD8DiskOnly(self, feedback_fn):
3498     """Replace a disk on the primary or secondary for dbrd8.
3499
3500     The algorithm for replace is quite complicated:
3501       - for each disk to be replaced:
3502         - create new LVs on the target node with unique names
3503         - detach old LVs from the drbd device
3504         - rename old LVs to name_replaced.<time_t>
3505         - rename new LVs to old LVs
3506         - attach the new LVs (with the old names now) to the drbd device
3507       - wait for sync across all devices
3508       - for each modified disk:
3509         - remove old LVs (which have the name name_replaces.<time_t>)
3510
3511     Failures are not very well handled.
3512
3513     """
3514     steps_total = 6
3515     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3516     instance = self.instance
3517     iv_names = {}
3518     vgname = self.cfg.GetVGName()
3519     # start of work
3520     cfg = self.cfg
3521     tgt_node = self.tgt_node
3522     oth_node = self.oth_node
3523
3524     # Step: check device activation
3525     self.proc.LogStep(1, steps_total, "check device existence")
3526     info("checking volume groups")
3527     my_vg = cfg.GetVGName()
3528     results = rpc.call_vg_list([oth_node, tgt_node])
3529     if not results:
3530       raise errors.OpExecError("Can't list volume groups on the nodes")
3531     for node in oth_node, tgt_node:
3532       res = results.get(node, False)
3533       if not res or my_vg not in res:
3534         raise errors.OpExecError("Volume group '%s' not found on %s" %
3535                                  (my_vg, node))
3536     for dev in instance.disks:
3537       if not dev.iv_name in self.op.disks:
3538         continue
3539       for node in tgt_node, oth_node:
3540         info("checking %s on %s" % (dev.iv_name, node))
3541         cfg.SetDiskID(dev, node)
3542         if not rpc.call_blockdev_find(node, dev):
3543           raise errors.OpExecError("Can't find device %s on node %s" %
3544                                    (dev.iv_name, node))
3545
3546     # Step: check other node consistency
3547     self.proc.LogStep(2, steps_total, "check peer consistency")
3548     for dev in instance.disks:
3549       if not dev.iv_name in self.op.disks:
3550         continue
3551       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3552       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3553                                    oth_node==instance.primary_node):
3554         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3555                                  " to replace disks on this node (%s)" %
3556                                  (oth_node, tgt_node))
3557
3558     # Step: create new storage
3559     self.proc.LogStep(3, steps_total, "allocate new storage")
3560     for dev in instance.disks:
3561       if not dev.iv_name in self.op.disks:
3562         continue
3563       size = dev.size
3564       cfg.SetDiskID(dev, tgt_node)
3565       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3566       names = _GenerateUniqueNames(cfg, lv_names)
3567       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3568                              logical_id=(vgname, names[0]))
3569       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3570                              logical_id=(vgname, names[1]))
3571       new_lvs = [lv_data, lv_meta]
3572       old_lvs = dev.children
3573       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3574       info("creating new local storage on %s for %s" %
3575            (tgt_node, dev.iv_name))
3576       # since we *always* want to create this LV, we use the
3577       # _Create...OnPrimary (which forces the creation), even if we
3578       # are talking about the secondary node
3579       for new_lv in new_lvs:
3580         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3581                                         _GetInstanceInfoText(instance)):
3582           raise errors.OpExecError("Failed to create new LV named '%s' on"
3583                                    " node '%s'" %
3584                                    (new_lv.logical_id[1], tgt_node))
3585
3586     # Step: for each lv, detach+rename*2+attach
3587     self.proc.LogStep(4, steps_total, "change drbd configuration")
3588     for dev, old_lvs, new_lvs in iv_names.itervalues():
3589       info("detaching %s drbd from local storage" % dev.iv_name)
3590       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3591         raise errors.OpExecError("Can't detach drbd from local storage on node"
3592                                  " %s for device %s" % (tgt_node, dev.iv_name))
3593       #dev.children = []
3594       #cfg.Update(instance)
3595
3596       # ok, we created the new LVs, so now we know we have the needed
3597       # storage; as such, we proceed on the target node to rename
3598       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3599       # using the assumption than logical_id == physical_id (which in
3600       # turn is the unique_id on that node)
3601
3602       # FIXME(iustin): use a better name for the replaced LVs
3603       temp_suffix = int(time.time())
3604       ren_fn = lambda d, suff: (d.physical_id[0],
3605                                 d.physical_id[1] + "_replaced-%s" % suff)
3606       # build the rename list based on what LVs exist on the node
3607       rlist = []
3608       for to_ren in old_lvs:
3609         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3610         if find_res is not None: # device exists
3611           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3612
3613       info("renaming the old LVs on the target node")
3614       if not rpc.call_blockdev_rename(tgt_node, rlist):
3615         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3616       # now we rename the new LVs to the old LVs
3617       info("renaming the new LVs on the target node")
3618       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3619       if not rpc.call_blockdev_rename(tgt_node, rlist):
3620         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3621
3622       for old, new in zip(old_lvs, new_lvs):
3623         new.logical_id = old.logical_id
3624         cfg.SetDiskID(new, tgt_node)
3625
3626       for disk in old_lvs:
3627         disk.logical_id = ren_fn(disk, temp_suffix)
3628         cfg.SetDiskID(disk, tgt_node)
3629
3630       # now that the new lvs have the old name, we can add them to the device
3631       info("adding new mirror component on %s" % tgt_node)
3632       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3633         for new_lv in new_lvs:
3634           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3635             warning("Can't rollback device %s", "manually cleanup unused"
3636                     " logical volumes")
3637         raise errors.OpExecError("Can't add local storage to drbd")
3638
3639       dev.children = new_lvs
3640       cfg.Update(instance)
3641
3642     # Step: wait for sync
3643
3644     # this can fail as the old devices are degraded and _WaitForSync
3645     # does a combined result over all disks, so we don't check its
3646     # return value
3647     self.proc.LogStep(5, steps_total, "sync devices")
3648     _WaitForSync(cfg, instance, self.proc, unlock=True)
3649
3650     # so check manually all the devices
3651     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3652       cfg.SetDiskID(dev, instance.primary_node)
3653       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3654       if is_degr:
3655         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3656
3657     # Step: remove old storage
3658     self.proc.LogStep(6, steps_total, "removing old storage")
3659     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3660       info("remove logical volumes for %s" % name)
3661       for lv in old_lvs:
3662         cfg.SetDiskID(lv, tgt_node)
3663         if not rpc.call_blockdev_remove(tgt_node, lv):
3664           warning("Can't remove old LV", "manually remove unused LVs")
3665           continue
3666
3667   def _ExecD8Secondary(self, feedback_fn):
3668     """Replace the secondary node for drbd8.
3669
3670     The algorithm for replace is quite complicated:
3671       - for all disks of the instance:
3672         - create new LVs on the new node with same names
3673         - shutdown the drbd device on the old secondary
3674         - disconnect the drbd network on the primary
3675         - create the drbd device on the new secondary
3676         - network attach the drbd on the primary, using an artifice:
3677           the drbd code for Attach() will connect to the network if it
3678           finds a device which is connected to the good local disks but
3679           not network enabled
3680       - wait for sync across all devices
3681       - remove all disks from the old secondary
3682
3683     Failures are not very well handled.
3684
3685     """
3686     steps_total = 6
3687     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3688     instance = self.instance
3689     iv_names = {}
3690     vgname = self.cfg.GetVGName()
3691     # start of work
3692     cfg = self.cfg
3693     old_node = self.tgt_node
3694     new_node = self.new_node
3695     pri_node = instance.primary_node
3696
3697     # Step: check device activation
3698     self.proc.LogStep(1, steps_total, "check device existence")
3699     info("checking volume groups")
3700     my_vg = cfg.GetVGName()
3701     results = rpc.call_vg_list([pri_node, new_node])
3702     if not results:
3703       raise errors.OpExecError("Can't list volume groups on the nodes")
3704     for node in pri_node, new_node:
3705       res = results.get(node, False)
3706       if not res or my_vg not in res:
3707         raise errors.OpExecError("Volume group '%s' not found on %s" %
3708                                  (my_vg, node))
3709     for dev in instance.disks:
3710       if not dev.iv_name in self.op.disks:
3711         continue
3712       info("checking %s on %s" % (dev.iv_name, pri_node))
3713       cfg.SetDiskID(dev, pri_node)
3714       if not rpc.call_blockdev_find(pri_node, dev):
3715         raise errors.OpExecError("Can't find device %s on node %s" %
3716                                  (dev.iv_name, pri_node))
3717
3718     # Step: check other node consistency
3719     self.proc.LogStep(2, steps_total, "check peer consistency")
3720     for dev in instance.disks:
3721       if not dev.iv_name in self.op.disks:
3722         continue
3723       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3724       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3725         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3726                                  " unsafe to replace the secondary" %
3727                                  pri_node)
3728
3729     # Step: create new storage
3730     self.proc.LogStep(3, steps_total, "allocate new storage")
3731     for dev in instance.disks:
3732       size = dev.size
3733       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3734       # since we *always* want to create this LV, we use the
3735       # _Create...OnPrimary (which forces the creation), even if we
3736       # are talking about the secondary node
3737       for new_lv in dev.children:
3738         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3739                                         _GetInstanceInfoText(instance)):
3740           raise errors.OpExecError("Failed to create new LV named '%s' on"
3741                                    " node '%s'" %
3742                                    (new_lv.logical_id[1], new_node))
3743
3744       iv_names[dev.iv_name] = (dev, dev.children)
3745
3746     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3747     for dev in instance.disks:
3748       size = dev.size
3749       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3750       # create new devices on new_node
3751       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3752                               logical_id=(pri_node, new_node,
3753                                           dev.logical_id[2]),
3754                               children=dev.children)
3755       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3756                                         new_drbd, False,
3757                                       _GetInstanceInfoText(instance)):
3758         raise errors.OpExecError("Failed to create new DRBD on"
3759                                  " node '%s'" % new_node)
3760
3761     for dev in instance.disks:
3762       # we have new devices, shutdown the drbd on the old secondary
3763       info("shutting down drbd for %s on old node" % dev.iv_name)
3764       cfg.SetDiskID(dev, old_node)
3765       if not rpc.call_blockdev_shutdown(old_node, dev):
3766         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3767                 "Please cleanup this device manuall as soon as possible")
3768
3769       # we have new storage, we 'rename' the network on the primary
3770       info("switching primary drbd for %s to new secondary node" % dev.iv_name)
3771       cfg.SetDiskID(dev, pri_node)
3772       # rename to the ip of the new node
3773       new_uid = list(dev.physical_id)
3774       new_uid[2] = self.remote_node_info.secondary_ip
3775       rlist = [(dev, tuple(new_uid))]
3776       if not rpc.call_blockdev_rename(pri_node, rlist):
3777         raise errors.OpExecError("Can't detach & re-attach drbd %s on node"
3778                                  " %s from %s to %s" %
3779                                  (dev.iv_name, pri_node, old_node, new_node))
3780       dev.logical_id = (pri_node, new_node, dev.logical_id[2])
3781       cfg.SetDiskID(dev, pri_node)
3782       cfg.Update(instance)
3783
3784
3785     # this can fail as the old devices are degraded and _WaitForSync
3786     # does a combined result over all disks, so we don't check its
3787     # return value
3788     self.proc.LogStep(5, steps_total, "sync devices")
3789     _WaitForSync(cfg, instance, self.proc, unlock=True)
3790
3791     # so check manually all the devices
3792     for name, (dev, old_lvs) in iv_names.iteritems():
3793       cfg.SetDiskID(dev, pri_node)
3794       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3795       if is_degr:
3796         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3797
3798     self.proc.LogStep(6, steps_total, "removing old storage")
3799     for name, (dev, old_lvs) in iv_names.iteritems():
3800       info("remove logical volumes for %s" % name)
3801       for lv in old_lvs:
3802         cfg.SetDiskID(lv, old_node)
3803         if not rpc.call_blockdev_remove(old_node, lv):
3804           warning("Can't remove LV on old secondary",
3805                   "Cleanup stale volumes by hand")
3806
3807   def Exec(self, feedback_fn):
3808     """Execute disk replacement.
3809
3810     This dispatches the disk replacement to the appropriate handler.
3811
3812     """
3813     instance = self.instance
3814     if instance.disk_template == constants.DT_REMOTE_RAID1:
3815       fn = self._ExecRR1
3816     elif instance.disk_template == constants.DT_DRBD8:
3817       if self.op.remote_node is None:
3818         fn = self._ExecD8DiskOnly
3819       else:
3820         fn = self._ExecD8Secondary
3821     else:
3822       raise errors.ProgrammerError("Unhandled disk replacement case")
3823     return fn(feedback_fn)
3824
3825
3826 class LUQueryInstanceData(NoHooksLU):
3827   """Query runtime instance data.
3828
3829   """
3830   _OP_REQP = ["instances"]
3831
3832   def CheckPrereq(self):
3833     """Check prerequisites.
3834
3835     This only checks the optional instance list against the existing names.
3836
3837     """
3838     if not isinstance(self.op.instances, list):
3839       raise errors.OpPrereqError("Invalid argument type 'instances'")
3840     if self.op.instances:
3841       self.wanted_instances = []
3842       names = self.op.instances
3843       for name in names:
3844         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3845         if instance is None:
3846           raise errors.OpPrereqError("No such instance name '%s'" % name)
3847       self.wanted_instances.append(instance)
3848     else:
3849       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3850                                in self.cfg.GetInstanceList()]
3851     return
3852
3853
3854   def _ComputeDiskStatus(self, instance, snode, dev):
3855     """Compute block device status.
3856
3857     """
3858     self.cfg.SetDiskID(dev, instance.primary_node)
3859     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3860     if dev.dev_type in constants.LDS_DRBD:
3861       # we change the snode then (otherwise we use the one passed in)
3862       if dev.logical_id[0] == instance.primary_node:
3863         snode = dev.logical_id[1]
3864       else:
3865         snode = dev.logical_id[0]
3866
3867     if snode:
3868       self.cfg.SetDiskID(dev, snode)
3869       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3870     else:
3871       dev_sstatus = None
3872
3873     if dev.children:
3874       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3875                       for child in dev.children]
3876     else:
3877       dev_children = []
3878
3879     data = {
3880       "iv_name": dev.iv_name,
3881       "dev_type": dev.dev_type,
3882       "logical_id": dev.logical_id,
3883       "physical_id": dev.physical_id,
3884       "pstatus": dev_pstatus,
3885       "sstatus": dev_sstatus,
3886       "children": dev_children,
3887       }
3888
3889     return data
3890
3891   def Exec(self, feedback_fn):
3892     """Gather and return data"""
3893     result = {}
3894     for instance in self.wanted_instances:
3895       remote_info = rpc.call_instance_info(instance.primary_node,
3896                                                 instance.name)
3897       if remote_info and "state" in remote_info:
3898         remote_state = "up"
3899       else:
3900         remote_state = "down"
3901       if instance.status == "down":
3902         config_state = "down"
3903       else:
3904         config_state = "up"
3905
3906       disks = [self._ComputeDiskStatus(instance, None, device)
3907                for device in instance.disks]
3908
3909       idict = {
3910         "name": instance.name,
3911         "config_state": config_state,
3912         "run_state": remote_state,
3913         "pnode": instance.primary_node,
3914         "snodes": instance.secondary_nodes,
3915         "os": instance.os,
3916         "memory": instance.memory,
3917         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3918         "disks": disks,
3919         "vcpus": instance.vcpus,
3920         }
3921
3922       result[instance.name] = idict
3923
3924     return result
3925
3926
3927 class LUSetInstanceParms(LogicalUnit):
3928   """Modifies an instances's parameters.
3929
3930   """
3931   HPATH = "instance-modify"
3932   HTYPE = constants.HTYPE_INSTANCE
3933   _OP_REQP = ["instance_name"]
3934
3935   def BuildHooksEnv(self):
3936     """Build hooks env.
3937
3938     This runs on the master, primary and secondaries.
3939
3940     """
3941     args = dict()
3942     if self.mem:
3943       args['memory'] = self.mem
3944     if self.vcpus:
3945       args['vcpus'] = self.vcpus
3946     if self.do_ip or self.do_bridge:
3947       if self.do_ip:
3948         ip = self.ip
3949       else:
3950         ip = self.instance.nics[0].ip
3951       if self.bridge:
3952         bridge = self.bridge
3953       else:
3954         bridge = self.instance.nics[0].bridge
3955       args['nics'] = [(ip, bridge)]
3956     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3957     nl = [self.sstore.GetMasterNode(),
3958           self.instance.primary_node] + list(self.instance.secondary_nodes)
3959     return env, nl, nl
3960
3961   def CheckPrereq(self):
3962     """Check prerequisites.
3963
3964     This only checks the instance list against the existing names.
3965
3966     """
3967     self.mem = getattr(self.op, "mem", None)
3968     self.vcpus = getattr(self.op, "vcpus", None)
3969     self.ip = getattr(self.op, "ip", None)
3970     self.bridge = getattr(self.op, "bridge", None)
3971     if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3972       raise errors.OpPrereqError("No changes submitted")
3973     if self.mem is not None:
3974       try:
3975         self.mem = int(self.mem)
3976       except ValueError, err:
3977         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3978     if self.vcpus is not None:
3979       try:
3980         self.vcpus = int(self.vcpus)
3981       except ValueError, err:
3982         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3983     if self.ip is not None:
3984       self.do_ip = True
3985       if self.ip.lower() == "none":
3986         self.ip = None
3987       else:
3988         if not utils.IsValidIP(self.ip):
3989           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3990     else:
3991       self.do_ip = False
3992     self.do_bridge = (self.bridge is not None)
3993
3994     instance = self.cfg.GetInstanceInfo(
3995       self.cfg.ExpandInstanceName(self.op.instance_name))
3996     if instance is None:
3997       raise errors.OpPrereqError("No such instance name '%s'" %
3998                                  self.op.instance_name)
3999     self.op.instance_name = instance.name
4000     self.instance = instance
4001     return
4002
4003   def Exec(self, feedback_fn):
4004     """Modifies an instance.
4005
4006     All parameters take effect only at the next restart of the instance.
4007     """
4008     result = []
4009     instance = self.instance
4010     if self.mem:
4011       instance.memory = self.mem
4012       result.append(("mem", self.mem))
4013     if self.vcpus:
4014       instance.vcpus = self.vcpus
4015       result.append(("vcpus",  self.vcpus))
4016     if self.do_ip:
4017       instance.nics[0].ip = self.ip
4018       result.append(("ip", self.ip))
4019     if self.bridge:
4020       instance.nics[0].bridge = self.bridge
4021       result.append(("bridge", self.bridge))
4022
4023     self.cfg.AddInstance(instance)
4024
4025     return result
4026
4027
4028 class LUQueryExports(NoHooksLU):
4029   """Query the exports list
4030
4031   """
4032   _OP_REQP = []
4033
4034   def CheckPrereq(self):
4035     """Check that the nodelist contains only existing nodes.
4036
4037     """
4038     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4039
4040   def Exec(self, feedback_fn):
4041     """Compute the list of all the exported system images.
4042
4043     Returns:
4044       a dictionary with the structure node->(export-list)
4045       where export-list is a list of the instances exported on
4046       that node.
4047
4048     """
4049     return rpc.call_export_list(self.nodes)
4050
4051
4052 class LUExportInstance(LogicalUnit):
4053   """Export an instance to an image in the cluster.
4054
4055   """
4056   HPATH = "instance-export"
4057   HTYPE = constants.HTYPE_INSTANCE
4058   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4059
4060   def BuildHooksEnv(self):
4061     """Build hooks env.
4062
4063     This will run on the master, primary node and target node.
4064
4065     """
4066     env = {
4067       "EXPORT_NODE": self.op.target_node,
4068       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4069       }
4070     env.update(_BuildInstanceHookEnvByObject(self.instance))
4071     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4072           self.op.target_node]
4073     return env, nl, nl
4074
4075   def CheckPrereq(self):
4076     """Check prerequisites.
4077
4078     This checks that the instance name is a valid one.
4079
4080     """
4081     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4082     self.instance = self.cfg.GetInstanceInfo(instance_name)
4083     if self.instance is None:
4084       raise errors.OpPrereqError("Instance '%s' not found" %
4085                                  self.op.instance_name)
4086
4087     # node verification
4088     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4089     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4090
4091     if self.dst_node is None:
4092       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4093                                  self.op.target_node)
4094     self.op.target_node = self.dst_node.name
4095
4096   def Exec(self, feedback_fn):
4097     """Export an instance to an image in the cluster.
4098
4099     """
4100     instance = self.instance
4101     dst_node = self.dst_node
4102     src_node = instance.primary_node
4103     # shutdown the instance, unless requested not to do so
4104     if self.op.shutdown:
4105       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4106       self.proc.ChainOpCode(op)
4107
4108     vgname = self.cfg.GetVGName()
4109
4110     snap_disks = []
4111
4112     try:
4113       for disk in instance.disks:
4114         if disk.iv_name == "sda":
4115           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4116           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4117
4118           if not new_dev_name:
4119             logger.Error("could not snapshot block device %s on node %s" %
4120                          (disk.logical_id[1], src_node))
4121           else:
4122             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4123                                       logical_id=(vgname, new_dev_name),
4124                                       physical_id=(vgname, new_dev_name),
4125                                       iv_name=disk.iv_name)
4126             snap_disks.append(new_dev)
4127
4128     finally:
4129       if self.op.shutdown:
4130         op = opcodes.OpStartupInstance(instance_name=instance.name,
4131                                        force=False)
4132         self.proc.ChainOpCode(op)
4133
4134     # TODO: check for size
4135
4136     for dev in snap_disks:
4137       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4138                                            instance):
4139         logger.Error("could not export block device %s from node"
4140                      " %s to node %s" %
4141                      (dev.logical_id[1], src_node, dst_node.name))
4142       if not rpc.call_blockdev_remove(src_node, dev):
4143         logger.Error("could not remove snapshot block device %s from"
4144                      " node %s" % (dev.logical_id[1], src_node))
4145
4146     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4147       logger.Error("could not finalize export for instance %s on node %s" %
4148                    (instance.name, dst_node.name))
4149
4150     nodelist = self.cfg.GetNodeList()
4151     nodelist.remove(dst_node.name)
4152
4153     # on one-node clusters nodelist will be empty after the removal
4154     # if we proceed the backup would be removed because OpQueryExports
4155     # substitutes an empty list with the full cluster node list.
4156     if nodelist:
4157       op = opcodes.OpQueryExports(nodes=nodelist)
4158       exportlist = self.proc.ChainOpCode(op)
4159       for node in exportlist:
4160         if instance.name in exportlist[node]:
4161           if not rpc.call_export_remove(node, instance.name):
4162             logger.Error("could not remove older export for instance %s"
4163                          " on node %s" % (instance.name, node))
4164
4165
4166 class TagsLU(NoHooksLU):
4167   """Generic tags LU.
4168
4169   This is an abstract class which is the parent of all the other tags LUs.
4170
4171   """
4172   def CheckPrereq(self):
4173     """Check prerequisites.
4174
4175     """
4176     if self.op.kind == constants.TAG_CLUSTER:
4177       self.target = self.cfg.GetClusterInfo()
4178     elif self.op.kind == constants.TAG_NODE:
4179       name = self.cfg.ExpandNodeName(self.op.name)
4180       if name is None:
4181         raise errors.OpPrereqError("Invalid node name (%s)" %
4182                                    (self.op.name,))
4183       self.op.name = name
4184       self.target = self.cfg.GetNodeInfo(name)
4185     elif self.op.kind == constants.TAG_INSTANCE:
4186       name = self.cfg.ExpandInstanceName(self.op.name)
4187       if name is None:
4188         raise errors.OpPrereqError("Invalid instance name (%s)" %
4189                                    (self.op.name,))
4190       self.op.name = name
4191       self.target = self.cfg.GetInstanceInfo(name)
4192     else:
4193       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4194                                  str(self.op.kind))
4195
4196
4197 class LUGetTags(TagsLU):
4198   """Returns the tags of a given object.
4199
4200   """
4201   _OP_REQP = ["kind", "name"]
4202
4203   def Exec(self, feedback_fn):
4204     """Returns the tag list.
4205
4206     """
4207     return self.target.GetTags()
4208
4209
4210 class LUSearchTags(NoHooksLU):
4211   """Searches the tags for a given pattern.
4212
4213   """
4214   _OP_REQP = ["pattern"]
4215
4216   def CheckPrereq(self):
4217     """Check prerequisites.
4218
4219     This checks the pattern passed for validity by compiling it.
4220
4221     """
4222     try:
4223       self.re = re.compile(self.op.pattern)
4224     except re.error, err:
4225       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4226                                  (self.op.pattern, err))
4227
4228   def Exec(self, feedback_fn):
4229     """Returns the tag list.
4230
4231     """
4232     cfg = self.cfg
4233     tgts = [("/cluster", cfg.GetClusterInfo())]
4234     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4235     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4236     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4237     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4238     results = []
4239     for path, target in tgts:
4240       for tag in target.GetTags():
4241         if self.re.search(tag):
4242           results.append((path, tag))
4243     return results
4244
4245
4246 class LUAddTags(TagsLU):
4247   """Sets a tag on a given object.
4248
4249   """
4250   _OP_REQP = ["kind", "name", "tags"]
4251
4252   def CheckPrereq(self):
4253     """Check prerequisites.
4254
4255     This checks the type and length of the tag name and value.
4256
4257     """
4258     TagsLU.CheckPrereq(self)
4259     for tag in self.op.tags:
4260       objects.TaggableObject.ValidateTag(tag)
4261
4262   def Exec(self, feedback_fn):
4263     """Sets the tag.
4264
4265     """
4266     try:
4267       for tag in self.op.tags:
4268         self.target.AddTag(tag)
4269     except errors.TagError, err:
4270       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4271     try:
4272       self.cfg.Update(self.target)
4273     except errors.ConfigurationError:
4274       raise errors.OpRetryError("There has been a modification to the"
4275                                 " config file and the operation has been"
4276                                 " aborted. Please retry.")
4277
4278
4279 class LUDelTags(TagsLU):
4280   """Delete a list of tags from a given object.
4281
4282   """
4283   _OP_REQP = ["kind", "name", "tags"]
4284
4285   def CheckPrereq(self):
4286     """Check prerequisites.
4287
4288     This checks that we have the given tag.
4289
4290     """
4291     TagsLU.CheckPrereq(self)
4292     for tag in self.op.tags:
4293       objects.TaggableObject.ValidateTag(tag)
4294     del_tags = frozenset(self.op.tags)
4295     cur_tags = self.target.GetTags()
4296     if not del_tags <= cur_tags:
4297       diff_tags = del_tags - cur_tags
4298       diff_names = ["'%s'" % tag for tag in diff_tags]
4299       diff_names.sort()
4300       raise errors.OpPrereqError("Tag(s) %s not found" %
4301                                  (",".join(diff_names)))
4302
4303   def Exec(self, feedback_fn):
4304     """Remove the tag from the object.
4305
4306     """
4307     for tag in self.op.tags:
4308       self.target.RemoveTag(tag)
4309     try:
4310       self.cfg.Update(self.target)
4311     except errors.ConfigurationError:
4312       raise errors.OpRetryError("There has been a modification to the"
4313                                 " config file and the operation has been"
4314                                 " aborted. Please retry.")