96e028d432be5ce41c9989efd1b2363d9a67080d
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement CheckPrereq which also fills in the opcode instance
52       with all the fields (even if as None)
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements (REQ_CLUSTER,
57       REQ_MASTER); note that all commands require root permissions
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_CLUSTER = True
64   REQ_MASTER = True
65
66   def __init__(self, processor, op, cfg, sstore):
67     """Constructor for LogicalUnit.
68
69     This needs to be overriden in derived classes in order to check op
70     validity.
71
72     """
73     self.proc = processor
74     self.op = op
75     self.cfg = cfg
76     self.sstore = sstore
77     self.__ssh = None
78
79     for attr_name in self._OP_REQP:
80       attr_val = getattr(op, attr_name, None)
81       if attr_val is None:
82         raise errors.OpPrereqError("Required parameter '%s' missing" %
83                                    attr_name)
84     if self.REQ_CLUSTER:
85       if not cfg.IsCluster():
86         raise errors.OpPrereqError("Cluster not initialized yet,"
87                                    " use 'gnt-cluster init' first.")
88       if self.REQ_MASTER:
89         master = sstore.GetMasterNode()
90         if master != utils.HostInfo().name:
91           raise errors.OpPrereqError("Commands must be run on the master"
92                                      " node %s" % master)
93
94   def __GetSSH(self):
95     """Returns the SshRunner object
96
97     """
98     if not self.__ssh:
99       self.__ssh = ssh.SshRunner(self.sstore)
100     return self.__ssh
101
102   ssh = property(fget=__GetSSH)
103
104   def CheckPrereq(self):
105     """Check prerequisites for this LU.
106
107     This method should check that the prerequisites for the execution
108     of this LU are fulfilled. It can do internode communication, but
109     it should be idempotent - no cluster or system changes are
110     allowed.
111
112     The method should raise errors.OpPrereqError in case something is
113     not fulfilled. Its return value is ignored.
114
115     This method should also update all the parameters of the opcode to
116     their canonical form; e.g. a short node name must be fully
117     expanded after this method has successfully completed (so that
118     hooks, logging, etc. work correctly).
119
120     """
121     raise NotImplementedError
122
123   def Exec(self, feedback_fn):
124     """Execute the LU.
125
126     This method should implement the actual work. It should raise
127     errors.OpExecError for failures that are somewhat dealt with in
128     code, or expected.
129
130     """
131     raise NotImplementedError
132
133   def BuildHooksEnv(self):
134     """Build hooks environment for this LU.
135
136     This method should return a three-node tuple consisting of: a dict
137     containing the environment that will be used for running the
138     specific hook for this LU, a list of node names on which the hook
139     should run before the execution, and a list of node names on which
140     the hook should run after the execution.
141
142     The keys of the dict must not have 'GANETI_' prefixed as this will
143     be handled in the hooks runner. Also note additional keys will be
144     added by the hooks runner. If the LU doesn't define any
145     environment, an empty dict (and not None) should be returned.
146
147     As for the node lists, the master should not be included in the
148     them, as it will be added by the hooks runner in case this LU
149     requires a cluster to run on (otherwise we don't have a node
150     list). No nodes should be returned as an empty list (and not
151     None).
152
153     Note that if the HPATH for a LU class is None, this function will
154     not be called.
155
156     """
157     raise NotImplementedError
158
159
160 class NoHooksLU(LogicalUnit):
161   """Simple LU which runs no hooks.
162
163   This LU is intended as a parent for other LogicalUnits which will
164   run no hooks, in order to reduce duplicate code.
165
166   """
167   HPATH = None
168   HTYPE = None
169
170   def BuildHooksEnv(self):
171     """Build hooks env.
172
173     This is a no-op, since we don't run hooks.
174
175     """
176     return {}, [], []
177
178
179 def _AddHostToEtcHosts(hostname):
180   """Wrapper around utils.SetEtcHostsEntry.
181
182   """
183   hi = utils.HostInfo(name=hostname)
184   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
185
186
187 def _RemoveHostFromEtcHosts(hostname):
188   """Wrapper around utils.RemoveEtcHostsEntry.
189
190   """
191   hi = utils.HostInfo(name=hostname)
192   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
194
195
196 def _GetWantedNodes(lu, nodes):
197   """Returns list of checked and expanded node names.
198
199   Args:
200     nodes: List of nodes (strings) or None for all
201
202   """
203   if not isinstance(nodes, list):
204     raise errors.OpPrereqError("Invalid argument type 'nodes'")
205
206   if nodes:
207     wanted = []
208
209     for name in nodes:
210       node = lu.cfg.ExpandNodeName(name)
211       if node is None:
212         raise errors.OpPrereqError("No such node name '%s'" % name)
213       wanted.append(node)
214
215   else:
216     wanted = lu.cfg.GetNodeList()
217   return utils.NiceSort(wanted)
218
219
220 def _GetWantedInstances(lu, instances):
221   """Returns list of checked and expanded instance names.
222
223   Args:
224     instances: List of instances (strings) or None for all
225
226   """
227   if not isinstance(instances, list):
228     raise errors.OpPrereqError("Invalid argument type 'instances'")
229
230   if instances:
231     wanted = []
232
233     for name in instances:
234       instance = lu.cfg.ExpandInstanceName(name)
235       if instance is None:
236         raise errors.OpPrereqError("No such instance name '%s'" % name)
237       wanted.append(instance)
238
239   else:
240     wanted = lu.cfg.GetInstanceList()
241   return utils.NiceSort(wanted)
242
243
244 def _CheckOutputFields(static, dynamic, selected):
245   """Checks whether all selected fields are valid.
246
247   Args:
248     static: Static fields
249     dynamic: Dynamic fields
250
251   """
252   static_fields = frozenset(static)
253   dynamic_fields = frozenset(dynamic)
254
255   all_fields = static_fields | dynamic_fields
256
257   if not all_fields.issuperset(selected):
258     raise errors.OpPrereqError("Unknown output fields selected: %s"
259                                % ",".join(frozenset(selected).
260                                           difference(all_fields)))
261
262
263 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264                           memory, vcpus, nics):
265   """Builds instance related env variables for hooks from single variables.
266
267   Args:
268     secondary_nodes: List of secondary nodes as strings
269   """
270   env = {
271     "OP_TARGET": name,
272     "INSTANCE_NAME": name,
273     "INSTANCE_PRIMARY": primary_node,
274     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275     "INSTANCE_OS_TYPE": os_type,
276     "INSTANCE_STATUS": status,
277     "INSTANCE_MEMORY": memory,
278     "INSTANCE_VCPUS": vcpus,
279   }
280
281   if nics:
282     nic_count = len(nics)
283     for idx, (ip, bridge, mac) in enumerate(nics):
284       if ip is None:
285         ip = ""
286       env["INSTANCE_NIC%d_IP" % idx] = ip
287       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
289   else:
290     nic_count = 0
291
292   env["INSTANCE_NIC_COUNT"] = nic_count
293
294   return env
295
296
297 def _BuildInstanceHookEnvByObject(instance, override=None):
298   """Builds instance related env variables for hooks from an object.
299
300   Args:
301     instance: objects.Instance object of instance
302     override: dict of values to override
303   """
304   args = {
305     'name': instance.name,
306     'primary_node': instance.primary_node,
307     'secondary_nodes': instance.secondary_nodes,
308     'os_type': instance.os,
309     'status': instance.os,
310     'memory': instance.memory,
311     'vcpus': instance.vcpus,
312     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
313   }
314   if override:
315     args.update(override)
316   return _BuildInstanceHookEnv(**args)
317
318
319 def _HasValidVG(vglist, vgname):
320   """Checks if the volume group list is valid.
321
322   A non-None return value means there's an error, and the return value
323   is the error message.
324
325   """
326   vgsize = vglist.get(vgname, None)
327   if vgsize is None:
328     return "volume group '%s' missing" % vgname
329   elif vgsize < 20480:
330     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
331             (vgname, vgsize))
332   return None
333
334
335 def _InitSSHSetup(node):
336   """Setup the SSH configuration for the cluster.
337
338
339   This generates a dsa keypair for root, adds the pub key to the
340   permitted hosts and adds the hostkey to its own known hosts.
341
342   Args:
343     node: the name of this host as a fqdn
344
345   """
346   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
347
348   for name in priv_key, pub_key:
349     if os.path.exists(name):
350       utils.CreateBackup(name)
351     utils.RemoveFile(name)
352
353   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
354                          "-f", priv_key,
355                          "-q", "-N", ""])
356   if result.failed:
357     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
358                              result.output)
359
360   f = open(pub_key, 'r')
361   try:
362     utils.AddAuthorizedKey(auth_keys, f.read(8192))
363   finally:
364     f.close()
365
366
367 def _InitGanetiServerSetup(ss):
368   """Setup the necessary configuration for the initial node daemon.
369
370   This creates the nodepass file containing the shared password for
371   the cluster and also generates the SSL certificate.
372
373   """
374   # Create pseudo random password
375   randpass = sha.new(os.urandom(64)).hexdigest()
376   # and write it into sstore
377   ss.SetKey(ss.SS_NODED_PASS, randpass)
378
379   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380                          "-days", str(365*5), "-nodes", "-x509",
381                          "-keyout", constants.SSL_CERT_FILE,
382                          "-out", constants.SSL_CERT_FILE, "-batch"])
383   if result.failed:
384     raise errors.OpExecError("could not generate server ssl cert, command"
385                              " %s had exitcode %s and error message %s" %
386                              (result.cmd, result.exit_code, result.output))
387
388   os.chmod(constants.SSL_CERT_FILE, 0400)
389
390   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
391
392   if result.failed:
393     raise errors.OpExecError("Could not start the node daemon, command %s"
394                              " had exitcode %s and error %s" %
395                              (result.cmd, result.exit_code, result.output))
396
397
398 def _CheckInstanceBridgesExist(instance):
399   """Check that the brigdes needed by an instance exist.
400
401   """
402   # check bridges existance
403   brlist = [nic.bridge for nic in instance.nics]
404   if not rpc.call_bridges_exist(instance.primary_node, brlist):
405     raise errors.OpPrereqError("one or more target bridges %s does not"
406                                " exist on destination node '%s'" %
407                                (brlist, instance.primary_node))
408
409
410 class LUInitCluster(LogicalUnit):
411   """Initialise the cluster.
412
413   """
414   HPATH = "cluster-init"
415   HTYPE = constants.HTYPE_CLUSTER
416   _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
417               "def_bridge", "master_netdev", "file_storage_dir"]
418   REQ_CLUSTER = False
419
420   def BuildHooksEnv(self):
421     """Build hooks env.
422
423     Notes: Since we don't require a cluster, we must manually add
424     ourselves in the post-run node list.
425
426     """
427     env = {"OP_TARGET": self.op.cluster_name}
428     return env, [], [self.hostname.name]
429
430   def CheckPrereq(self):
431     """Verify that the passed name is a valid one.
432
433     """
434     if config.ConfigWriter.IsCluster():
435       raise errors.OpPrereqError("Cluster is already initialised")
436
437     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438       if not os.path.exists(constants.VNC_PASSWORD_FILE):
439         raise errors.OpPrereqError("Please prepare the cluster VNC"
440                                    "password file %s" %
441                                    constants.VNC_PASSWORD_FILE)
442
443     self.hostname = hostname = utils.HostInfo()
444
445     if hostname.ip.startswith("127."):
446       raise errors.OpPrereqError("This host's IP resolves to the private"
447                                  " range (%s). Please fix DNS or %s." %
448                                  (hostname.ip, constants.ETC_HOSTS))
449
450     if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451                          source=constants.LOCALHOST_IP_ADDRESS):
452       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453                                  " to %s,\nbut this ip address does not"
454                                  " belong to this host."
455                                  " Aborting." % hostname.ip)
456
457     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
458
459     if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
460                      timeout=5):
461       raise errors.OpPrereqError("Cluster IP already active. Aborting.")
462
463     secondary_ip = getattr(self.op, "secondary_ip", None)
464     if secondary_ip and not utils.IsValidIP(secondary_ip):
465       raise errors.OpPrereqError("Invalid secondary ip given")
466     if (secondary_ip and
467         secondary_ip != hostname.ip and
468         (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469                            source=constants.LOCALHOST_IP_ADDRESS))):
470       raise errors.OpPrereqError("You gave %s as secondary IP,"
471                                  " but it does not belong to this host." %
472                                  secondary_ip)
473     self.secondary_ip = secondary_ip
474
475     if not hasattr(self.op, "vg_name"):
476       self.op.vg_name = None
477     # if vg_name not None, checks if volume group is valid
478     if self.op.vg_name:
479       vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
480       if vgstatus:
481         raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
482                                    " you are not using lvm" % vgstatus)
483
484     self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
485
486     if not os.path.isabs(self.op.file_storage_dir):
487       raise errors.OpPrereqError("The file storage directory you have is"
488                                  " not an absolute path.")
489
490     if not os.path.exists(self.op.file_storage_dir):
491       try:
492         os.makedirs(self.op.file_storage_dir, 0750)
493       except OSError, err:
494         raise errors.OpPrereqError("Cannot create file storage directory"
495                                    " '%s': %s" %
496                                    (self.op.file_storage_dir, err))
497
498     if not os.path.isdir(self.op.file_storage_dir):
499       raise errors.OpPrereqError("The file storage directory '%s' is not"
500                                  " a directory." % self.op.file_storage_dir)
501
502     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
503                     self.op.mac_prefix):
504       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
505                                  self.op.mac_prefix)
506
507     if self.op.hypervisor_type not in constants.HYPER_TYPES:
508       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
509                                  self.op.hypervisor_type)
510
511     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
512     if result.failed:
513       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
514                                  (self.op.master_netdev,
515                                   result.output.strip()))
516
517     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
518             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
519       raise errors.OpPrereqError("Init.d script '%s' missing or not"
520                                  " executable." % constants.NODE_INITD_SCRIPT)
521
522   def Exec(self, feedback_fn):
523     """Initialize the cluster.
524
525     """
526     clustername = self.clustername
527     hostname = self.hostname
528
529     # set up the simple store
530     self.sstore = ss = ssconf.SimpleStore()
531     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
532     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
533     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
534     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
535     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
536     ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
537
538     # set up the inter-node password and certificate
539     _InitGanetiServerSetup(ss)
540
541     # start the master ip
542     rpc.call_node_start_master(hostname.name)
543
544     # set up ssh config and /etc/hosts
545     f = open(constants.SSH_HOST_RSA_PUB, 'r')
546     try:
547       sshline = f.read()
548     finally:
549       f.close()
550     sshkey = sshline.split(" ")[1]
551
552     _AddHostToEtcHosts(hostname.name)
553     _InitSSHSetup(hostname.name)
554
555     # init of cluster config file
556     self.cfg = cfgw = config.ConfigWriter()
557     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
558                     sshkey, self.op.mac_prefix,
559                     self.op.vg_name, self.op.def_bridge)
560
561     ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
562
563
564 class LUDestroyCluster(NoHooksLU):
565   """Logical unit for destroying the cluster.
566
567   """
568   _OP_REQP = []
569
570   def CheckPrereq(self):
571     """Check prerequisites.
572
573     This checks whether the cluster is empty.
574
575     Any errors are signalled by raising errors.OpPrereqError.
576
577     """
578     master = self.sstore.GetMasterNode()
579
580     nodelist = self.cfg.GetNodeList()
581     if len(nodelist) != 1 or nodelist[0] != master:
582       raise errors.OpPrereqError("There are still %d node(s) in"
583                                  " this cluster." % (len(nodelist) - 1))
584     instancelist = self.cfg.GetInstanceList()
585     if instancelist:
586       raise errors.OpPrereqError("There are still %d instance(s) in"
587                                  " this cluster." % len(instancelist))
588
589   def Exec(self, feedback_fn):
590     """Destroys the cluster.
591
592     """
593     master = self.sstore.GetMasterNode()
594     if not rpc.call_node_stop_master(master):
595       raise errors.OpExecError("Could not disable the master role")
596     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
597     utils.CreateBackup(priv_key)
598     utils.CreateBackup(pub_key)
599     rpc.call_node_leave_cluster(master)
600
601
602 class LUVerifyCluster(NoHooksLU):
603   """Verifies the cluster status.
604
605   """
606   _OP_REQP = []
607
608   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
609                   remote_version, feedback_fn):
610     """Run multiple tests against a node.
611
612     Test list:
613       - compares ganeti version
614       - checks vg existance and size > 20G
615       - checks config file checksum
616       - checks ssh to other nodes
617
618     Args:
619       node: name of the node to check
620       file_list: required list of files
621       local_cksum: dictionary of local files and their checksums
622
623     """
624     # compares ganeti version
625     local_version = constants.PROTOCOL_VERSION
626     if not remote_version:
627       feedback_fn(" - ERROR: connection to %s failed" % (node))
628       return True
629
630     if local_version != remote_version:
631       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
632                       (local_version, node, remote_version))
633       return True
634
635     # checks vg existance and size > 20G
636
637     bad = False
638     if not vglist:
639       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
640                       (node,))
641       bad = True
642     else:
643       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
644       if vgstatus:
645         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
646         bad = True
647
648     # checks config file checksum
649     # checks ssh to any
650
651     if 'filelist' not in node_result:
652       bad = True
653       feedback_fn("  - ERROR: node hasn't returned file checksum data")
654     else:
655       remote_cksum = node_result['filelist']
656       for file_name in file_list:
657         if file_name not in remote_cksum:
658           bad = True
659           feedback_fn("  - ERROR: file '%s' missing" % file_name)
660         elif remote_cksum[file_name] != local_cksum[file_name]:
661           bad = True
662           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
663
664     if 'nodelist' not in node_result:
665       bad = True
666       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
667     else:
668       if node_result['nodelist']:
669         bad = True
670         for node in node_result['nodelist']:
671           feedback_fn("  - ERROR: communication with node '%s': %s" %
672                           (node, node_result['nodelist'][node]))
673     hyp_result = node_result.get('hypervisor', None)
674     if hyp_result is not None:
675       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
676     return bad
677
678   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
679     """Verify an instance.
680
681     This function checks to see if the required block devices are
682     available on the instance's node.
683
684     """
685     bad = False
686
687     instancelist = self.cfg.GetInstanceList()
688     if not instance in instancelist:
689       feedback_fn("  - ERROR: instance %s not in instance list %s" %
690                       (instance, instancelist))
691       bad = True
692
693     instanceconfig = self.cfg.GetInstanceInfo(instance)
694     node_current = instanceconfig.primary_node
695
696     node_vol_should = {}
697     instanceconfig.MapLVsByNode(node_vol_should)
698
699     for node in node_vol_should:
700       for volume in node_vol_should[node]:
701         if node not in node_vol_is or volume not in node_vol_is[node]:
702           feedback_fn("  - ERROR: volume %s missing on node %s" %
703                           (volume, node))
704           bad = True
705
706     if not instanceconfig.status == 'down':
707       if not instance in node_instance[node_current]:
708         feedback_fn("  - ERROR: instance %s not running on node %s" %
709                         (instance, node_current))
710         bad = True
711
712     for node in node_instance:
713       if (not node == node_current):
714         if instance in node_instance[node]:
715           feedback_fn("  - ERROR: instance %s should not run on node %s" %
716                           (instance, node))
717           bad = True
718
719     return bad
720
721   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
722     """Verify if there are any unknown volumes in the cluster.
723
724     The .os, .swap and backup volumes are ignored. All other volumes are
725     reported as unknown.
726
727     """
728     bad = False
729
730     for node in node_vol_is:
731       for volume in node_vol_is[node]:
732         if node not in node_vol_should or volume not in node_vol_should[node]:
733           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
734                       (volume, node))
735           bad = True
736     return bad
737
738   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
739     """Verify the list of running instances.
740
741     This checks what instances are running but unknown to the cluster.
742
743     """
744     bad = False
745     for node in node_instance:
746       for runninginstance in node_instance[node]:
747         if runninginstance not in instancelist:
748           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
749                           (runninginstance, node))
750           bad = True
751     return bad
752
753   def CheckPrereq(self):
754     """Check prerequisites.
755
756     This has no prerequisites.
757
758     """
759     pass
760
761   def Exec(self, feedback_fn):
762     """Verify integrity of cluster, performing various test on nodes.
763
764     """
765     bad = False
766     feedback_fn("* Verifying global settings")
767     for msg in self.cfg.VerifyConfig():
768       feedback_fn("  - ERROR: %s" % msg)
769
770     vg_name = self.cfg.GetVGName()
771     nodelist = utils.NiceSort(self.cfg.GetNodeList())
772     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
773     node_volume = {}
774     node_instance = {}
775
776     # FIXME: verify OS list
777     # do local checksums
778     file_names = list(self.sstore.GetFileList())
779     file_names.append(constants.SSL_CERT_FILE)
780     file_names.append(constants.CLUSTER_CONF_FILE)
781     local_checksums = utils.FingerprintFiles(file_names)
782
783     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
784     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
785     all_instanceinfo = rpc.call_instance_list(nodelist)
786     all_vglist = rpc.call_vg_list(nodelist)
787     node_verify_param = {
788       'filelist': file_names,
789       'nodelist': nodelist,
790       'hypervisor': None,
791       }
792     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
793     all_rversion = rpc.call_version(nodelist)
794
795     for node in nodelist:
796       feedback_fn("* Verifying node %s" % node)
797       result = self._VerifyNode(node, file_names, local_checksums,
798                                 all_vglist[node], all_nvinfo[node],
799                                 all_rversion[node], feedback_fn)
800       bad = bad or result
801
802       # node_volume
803       volumeinfo = all_volumeinfo[node]
804
805       if isinstance(volumeinfo, basestring):
806         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
807                     (node, volumeinfo[-400:].encode('string_escape')))
808         bad = True
809         node_volume[node] = {}
810       elif not isinstance(volumeinfo, dict):
811         feedback_fn("  - ERROR: connection to %s failed" % (node,))
812         bad = True
813         continue
814       else:
815         node_volume[node] = volumeinfo
816
817       # node_instance
818       nodeinstance = all_instanceinfo[node]
819       if type(nodeinstance) != list:
820         feedback_fn("  - ERROR: connection to %s failed" % (node,))
821         bad = True
822         continue
823
824       node_instance[node] = nodeinstance
825
826     node_vol_should = {}
827
828     for instance in instancelist:
829       feedback_fn("* Verifying instance %s" % instance)
830       result =  self._VerifyInstance(instance, node_volume, node_instance,
831                                      feedback_fn)
832       bad = bad or result
833
834       inst_config = self.cfg.GetInstanceInfo(instance)
835
836       inst_config.MapLVsByNode(node_vol_should)
837
838     feedback_fn("* Verifying orphan volumes")
839     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840                                        feedback_fn)
841     bad = bad or result
842
843     feedback_fn("* Verifying remaining instances")
844     result = self._VerifyOrphanInstances(instancelist, node_instance,
845                                          feedback_fn)
846     bad = bad or result
847
848     return int(bad)
849
850
851 class LUVerifyDisks(NoHooksLU):
852   """Verifies the cluster disks status.
853
854   """
855   _OP_REQP = []
856
857   def CheckPrereq(self):
858     """Check prerequisites.
859
860     This has no prerequisites.
861
862     """
863     pass
864
865   def Exec(self, feedback_fn):
866     """Verify integrity of cluster disks.
867
868     """
869     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
870
871     vg_name = self.cfg.GetVGName()
872     nodes = utils.NiceSort(self.cfg.GetNodeList())
873     instances = [self.cfg.GetInstanceInfo(name)
874                  for name in self.cfg.GetInstanceList()]
875
876     nv_dict = {}
877     for inst in instances:
878       inst_lvs = {}
879       if (inst.status != "up" or
880           inst.disk_template not in constants.DTS_NET_MIRROR):
881         continue
882       inst.MapLVsByNode(inst_lvs)
883       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
884       for node, vol_list in inst_lvs.iteritems():
885         for vol in vol_list:
886           nv_dict[(node, vol)] = inst
887
888     if not nv_dict:
889       return result
890
891     node_lvs = rpc.call_volume_list(nodes, vg_name)
892
893     to_act = set()
894     for node in nodes:
895       # node_volume
896       lvs = node_lvs[node]
897
898       if isinstance(lvs, basestring):
899         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
900         res_nlvm[node] = lvs
901       elif not isinstance(lvs, dict):
902         logger.Info("connection to node %s failed or invalid data returned" %
903                     (node,))
904         res_nodes.append(node)
905         continue
906
907       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
908         inst = nv_dict.pop((node, lv_name), None)
909         if (not lv_online and inst is not None
910             and inst.name not in res_instances):
911           res_instances.append(inst.name)
912
913     # any leftover items in nv_dict are missing LVs, let's arrange the
914     # data better
915     for key, inst in nv_dict.iteritems():
916       if inst.name not in res_missing:
917         res_missing[inst.name] = []
918       res_missing[inst.name].append(key)
919
920     return result
921
922
923 class LURenameCluster(LogicalUnit):
924   """Rename the cluster.
925
926   """
927   HPATH = "cluster-rename"
928   HTYPE = constants.HTYPE_CLUSTER
929   _OP_REQP = ["name"]
930
931   def BuildHooksEnv(self):
932     """Build hooks env.
933
934     """
935     env = {
936       "OP_TARGET": self.sstore.GetClusterName(),
937       "NEW_NAME": self.op.name,
938       }
939     mn = self.sstore.GetMasterNode()
940     return env, [mn], [mn]
941
942   def CheckPrereq(self):
943     """Verify that the passed name is a valid one.
944
945     """
946     hostname = utils.HostInfo(self.op.name)
947
948     new_name = hostname.name
949     self.ip = new_ip = hostname.ip
950     old_name = self.sstore.GetClusterName()
951     old_ip = self.sstore.GetMasterIP()
952     if new_name == old_name and new_ip == old_ip:
953       raise errors.OpPrereqError("Neither the name nor the IP address of the"
954                                  " cluster has changed")
955     if new_ip != old_ip:
956       result = utils.RunCmd(["fping", "-q", new_ip])
957       if not result.failed:
958         raise errors.OpPrereqError("The given cluster IP address (%s) is"
959                                    " reachable on the network. Aborting." %
960                                    new_ip)
961
962     self.op.name = new_name
963
964   def Exec(self, feedback_fn):
965     """Rename the cluster.
966
967     """
968     clustername = self.op.name
969     ip = self.ip
970     ss = self.sstore
971
972     # shutdown the master IP
973     master = ss.GetMasterNode()
974     if not rpc.call_node_stop_master(master):
975       raise errors.OpExecError("Could not disable the master role")
976
977     try:
978       # modify the sstore
979       ss.SetKey(ss.SS_MASTER_IP, ip)
980       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
981
982       # Distribute updated ss config to all nodes
983       myself = self.cfg.GetNodeInfo(master)
984       dist_nodes = self.cfg.GetNodeList()
985       if myself.name in dist_nodes:
986         dist_nodes.remove(myself.name)
987
988       logger.Debug("Copying updated ssconf data to all nodes")
989       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
990         fname = ss.KeyToFilename(keyname)
991         result = rpc.call_upload_file(dist_nodes, fname)
992         for to_node in dist_nodes:
993           if not result[to_node]:
994             logger.Error("copy of file %s to node %s failed" %
995                          (fname, to_node))
996     finally:
997       if not rpc.call_node_start_master(master):
998         logger.Error("Could not re-enable the master role on the master,"
999                      " please restart manually.")
1000
1001
1002 def _RecursiveCheckIfLVMBased(disk):
1003   """Check if the given disk or its children are lvm-based.
1004
1005   Args:
1006     disk: ganeti.objects.Disk object
1007
1008   Returns:
1009     boolean indicating whether a LD_LV dev_type was found or not
1010
1011   """
1012   if disk.children:
1013     for chdisk in disk.children:
1014       if _RecursiveCheckIfLVMBased(chdisk):
1015         return True
1016   return disk.dev_type == constants.LD_LV
1017
1018
1019 class LUSetClusterParams(LogicalUnit):
1020   """Change the parameters of the cluster.
1021
1022   """
1023   HPATH = "cluster-modify"
1024   HTYPE = constants.HTYPE_CLUSTER
1025   _OP_REQP = []
1026
1027   def BuildHooksEnv(self):
1028     """Build hooks env.
1029
1030     """
1031     env = {
1032       "OP_TARGET": self.sstore.GetClusterName(),
1033       "NEW_VG_NAME": self.op.vg_name,
1034       }
1035     mn = self.sstore.GetMasterNode()
1036     return env, [mn], [mn]
1037
1038   def CheckPrereq(self):
1039     """Check prerequisites.
1040
1041     This checks whether the given params don't conflict and
1042     if the given volume group is valid.
1043
1044     """
1045     if not self.op.vg_name:
1046       instances = [self.cfg.GetInstanceInfo(name)
1047                    for name in self.cfg.GetInstanceList()]
1048       for inst in instances:
1049         for disk in inst.disks:
1050           if _RecursiveCheckIfLVMBased(disk):
1051             raise errors.OpPrereqError("Cannot disable lvm storage while"
1052                                        " lvm-based instances exist")
1053
1054     # if vg_name not None, checks given volume group on all nodes
1055     if self.op.vg_name:
1056       node_list = self.cfg.GetNodeList()
1057       vglist = rpc.call_vg_list(node_list)
1058       for node in node_list:
1059         vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1060         if vgstatus:
1061           raise errors.OpPrereqError("Error on node '%s': %s" %
1062                                      (node, vgstatus))
1063
1064   def Exec(self, feedback_fn):
1065     """Change the parameters of the cluster.
1066
1067     """
1068     if self.op.vg_name != self.cfg.GetVGName():
1069       self.cfg.SetVGName(self.op.vg_name)
1070     else:
1071       feedback_fn("Cluster LVM configuration already in desired"
1072                   " state, not changing")
1073
1074
1075 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1076   """Sleep and poll for an instance's disk to sync.
1077
1078   """
1079   if not instance.disks:
1080     return True
1081
1082   if not oneshot:
1083     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1084
1085   node = instance.primary_node
1086
1087   for dev in instance.disks:
1088     cfgw.SetDiskID(dev, node)
1089
1090   retries = 0
1091   while True:
1092     max_time = 0
1093     done = True
1094     cumul_degraded = False
1095     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1096     if not rstats:
1097       proc.LogWarning("Can't get any data from node %s" % node)
1098       retries += 1
1099       if retries >= 10:
1100         raise errors.RemoteError("Can't contact node %s for mirror data,"
1101                                  " aborting." % node)
1102       time.sleep(6)
1103       continue
1104     retries = 0
1105     for i in range(len(rstats)):
1106       mstat = rstats[i]
1107       if mstat is None:
1108         proc.LogWarning("Can't compute data for node %s/%s" %
1109                         (node, instance.disks[i].iv_name))
1110         continue
1111       # we ignore the ldisk parameter
1112       perc_done, est_time, is_degraded, _ = mstat
1113       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1114       if perc_done is not None:
1115         done = False
1116         if est_time is not None:
1117           rem_time = "%d estimated seconds remaining" % est_time
1118           max_time = est_time
1119         else:
1120           rem_time = "no time estimate"
1121         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1122                      (instance.disks[i].iv_name, perc_done, rem_time))
1123     if done or oneshot:
1124       break
1125
1126     if unlock:
1127       utils.Unlock('cmd')
1128     try:
1129       time.sleep(min(60, max_time))
1130     finally:
1131       if unlock:
1132         utils.Lock('cmd')
1133
1134   if done:
1135     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1136   return not cumul_degraded
1137
1138
1139 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1140   """Check that mirrors are not degraded.
1141
1142   The ldisk parameter, if True, will change the test from the
1143   is_degraded attribute (which represents overall non-ok status for
1144   the device(s)) to the ldisk (representing the local storage status).
1145
1146   """
1147   cfgw.SetDiskID(dev, node)
1148   if ldisk:
1149     idx = 6
1150   else:
1151     idx = 5
1152
1153   result = True
1154   if on_primary or dev.AssembleOnSecondary():
1155     rstats = rpc.call_blockdev_find(node, dev)
1156     if not rstats:
1157       logger.ToStderr("Can't get any data from node %s" % node)
1158       result = False
1159     else:
1160       result = result and (not rstats[idx])
1161   if dev.children:
1162     for child in dev.children:
1163       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1164
1165   return result
1166
1167
1168 class LUDiagnoseOS(NoHooksLU):
1169   """Logical unit for OS diagnose/query.
1170
1171   """
1172   _OP_REQP = []
1173
1174   def CheckPrereq(self):
1175     """Check prerequisites.
1176
1177     This always succeeds, since this is a pure query LU.
1178
1179     """
1180     return
1181
1182   def Exec(self, feedback_fn):
1183     """Compute the list of OSes.
1184
1185     """
1186     node_list = self.cfg.GetNodeList()
1187     node_data = rpc.call_os_diagnose(node_list)
1188     if node_data == False:
1189       raise errors.OpExecError("Can't gather the list of OSes")
1190     return node_data
1191
1192
1193 class LURemoveNode(LogicalUnit):
1194   """Logical unit for removing a node.
1195
1196   """
1197   HPATH = "node-remove"
1198   HTYPE = constants.HTYPE_NODE
1199   _OP_REQP = ["node_name"]
1200
1201   def BuildHooksEnv(self):
1202     """Build hooks env.
1203
1204     This doesn't run on the target node in the pre phase as a failed
1205     node would not allows itself to run.
1206
1207     """
1208     env = {
1209       "OP_TARGET": self.op.node_name,
1210       "NODE_NAME": self.op.node_name,
1211       }
1212     all_nodes = self.cfg.GetNodeList()
1213     all_nodes.remove(self.op.node_name)
1214     return env, all_nodes, all_nodes
1215
1216   def CheckPrereq(self):
1217     """Check prerequisites.
1218
1219     This checks:
1220      - the node exists in the configuration
1221      - it does not have primary or secondary instances
1222      - it's not the master
1223
1224     Any errors are signalled by raising errors.OpPrereqError.
1225
1226     """
1227     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1228     if node is None:
1229       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1230
1231     instance_list = self.cfg.GetInstanceList()
1232
1233     masternode = self.sstore.GetMasterNode()
1234     if node.name == masternode:
1235       raise errors.OpPrereqError("Node is the master node,"
1236                                  " you need to failover first.")
1237
1238     for instance_name in instance_list:
1239       instance = self.cfg.GetInstanceInfo(instance_name)
1240       if node.name == instance.primary_node:
1241         raise errors.OpPrereqError("Instance %s still running on the node,"
1242                                    " please remove first." % instance_name)
1243       if node.name in instance.secondary_nodes:
1244         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1245                                    " please remove first." % instance_name)
1246     self.op.node_name = node.name
1247     self.node = node
1248
1249   def Exec(self, feedback_fn):
1250     """Removes the node from the cluster.
1251
1252     """
1253     node = self.node
1254     logger.Info("stopping the node daemon and removing configs from node %s" %
1255                 node.name)
1256
1257     rpc.call_node_leave_cluster(node.name)
1258
1259     self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1260
1261     logger.Info("Removing node %s from config" % node.name)
1262
1263     self.cfg.RemoveNode(node.name)
1264
1265     _RemoveHostFromEtcHosts(node.name)
1266
1267
1268 class LUQueryNodes(NoHooksLU):
1269   """Logical unit for querying nodes.
1270
1271   """
1272   _OP_REQP = ["output_fields", "names"]
1273
1274   def CheckPrereq(self):
1275     """Check prerequisites.
1276
1277     This checks that the fields required are valid output fields.
1278
1279     """
1280     self.dynamic_fields = frozenset(["dtotal", "dfree",
1281                                      "mtotal", "mnode", "mfree",
1282                                      "bootid"])
1283
1284     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1285                                "pinst_list", "sinst_list",
1286                                "pip", "sip"],
1287                        dynamic=self.dynamic_fields,
1288                        selected=self.op.output_fields)
1289
1290     self.wanted = _GetWantedNodes(self, self.op.names)
1291
1292   def Exec(self, feedback_fn):
1293     """Computes the list of nodes and their attributes.
1294
1295     """
1296     nodenames = self.wanted
1297     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1298
1299     # begin data gathering
1300
1301     if self.dynamic_fields.intersection(self.op.output_fields):
1302       live_data = {}
1303       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1304       for name in nodenames:
1305         nodeinfo = node_data.get(name, None)
1306         if nodeinfo:
1307           live_data[name] = {
1308             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1309             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1310             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1311             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1312             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1313             "bootid": nodeinfo['bootid'],
1314             }
1315         else:
1316           live_data[name] = {}
1317     else:
1318       live_data = dict.fromkeys(nodenames, {})
1319
1320     node_to_primary = dict([(name, set()) for name in nodenames])
1321     node_to_secondary = dict([(name, set()) for name in nodenames])
1322
1323     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1324                              "sinst_cnt", "sinst_list"))
1325     if inst_fields & frozenset(self.op.output_fields):
1326       instancelist = self.cfg.GetInstanceList()
1327
1328       for instance_name in instancelist:
1329         inst = self.cfg.GetInstanceInfo(instance_name)
1330         if inst.primary_node in node_to_primary:
1331           node_to_primary[inst.primary_node].add(inst.name)
1332         for secnode in inst.secondary_nodes:
1333           if secnode in node_to_secondary:
1334             node_to_secondary[secnode].add(inst.name)
1335
1336     # end data gathering
1337
1338     output = []
1339     for node in nodelist:
1340       node_output = []
1341       for field in self.op.output_fields:
1342         if field == "name":
1343           val = node.name
1344         elif field == "pinst_list":
1345           val = list(node_to_primary[node.name])
1346         elif field == "sinst_list":
1347           val = list(node_to_secondary[node.name])
1348         elif field == "pinst_cnt":
1349           val = len(node_to_primary[node.name])
1350         elif field == "sinst_cnt":
1351           val = len(node_to_secondary[node.name])
1352         elif field == "pip":
1353           val = node.primary_ip
1354         elif field == "sip":
1355           val = node.secondary_ip
1356         elif field in self.dynamic_fields:
1357           val = live_data[node.name].get(field, None)
1358         else:
1359           raise errors.ParameterError(field)
1360         node_output.append(val)
1361       output.append(node_output)
1362
1363     return output
1364
1365
1366 class LUQueryNodeVolumes(NoHooksLU):
1367   """Logical unit for getting volumes on node(s).
1368
1369   """
1370   _OP_REQP = ["nodes", "output_fields"]
1371
1372   def CheckPrereq(self):
1373     """Check prerequisites.
1374
1375     This checks that the fields required are valid output fields.
1376
1377     """
1378     self.nodes = _GetWantedNodes(self, self.op.nodes)
1379
1380     _CheckOutputFields(static=["node"],
1381                        dynamic=["phys", "vg", "name", "size", "instance"],
1382                        selected=self.op.output_fields)
1383
1384
1385   def Exec(self, feedback_fn):
1386     """Computes the list of nodes and their attributes.
1387
1388     """
1389     nodenames = self.nodes
1390     volumes = rpc.call_node_volumes(nodenames)
1391
1392     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1393              in self.cfg.GetInstanceList()]
1394
1395     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1396
1397     output = []
1398     for node in nodenames:
1399       if node not in volumes or not volumes[node]:
1400         continue
1401
1402       node_vols = volumes[node][:]
1403       node_vols.sort(key=lambda vol: vol['dev'])
1404
1405       for vol in node_vols:
1406         node_output = []
1407         for field in self.op.output_fields:
1408           if field == "node":
1409             val = node
1410           elif field == "phys":
1411             val = vol['dev']
1412           elif field == "vg":
1413             val = vol['vg']
1414           elif field == "name":
1415             val = vol['name']
1416           elif field == "size":
1417             val = int(float(vol['size']))
1418           elif field == "instance":
1419             for inst in ilist:
1420               if node not in lv_by_node[inst]:
1421                 continue
1422               if vol['name'] in lv_by_node[inst][node]:
1423                 val = inst.name
1424                 break
1425             else:
1426               val = '-'
1427           else:
1428             raise errors.ParameterError(field)
1429           node_output.append(str(val))
1430
1431         output.append(node_output)
1432
1433     return output
1434
1435
1436 class LUAddNode(LogicalUnit):
1437   """Logical unit for adding node to the cluster.
1438
1439   """
1440   HPATH = "node-add"
1441   HTYPE = constants.HTYPE_NODE
1442   _OP_REQP = ["node_name"]
1443
1444   def BuildHooksEnv(self):
1445     """Build hooks env.
1446
1447     This will run on all nodes before, and on all nodes + the new node after.
1448
1449     """
1450     env = {
1451       "OP_TARGET": self.op.node_name,
1452       "NODE_NAME": self.op.node_name,
1453       "NODE_PIP": self.op.primary_ip,
1454       "NODE_SIP": self.op.secondary_ip,
1455       }
1456     nodes_0 = self.cfg.GetNodeList()
1457     nodes_1 = nodes_0 + [self.op.node_name, ]
1458     return env, nodes_0, nodes_1
1459
1460   def CheckPrereq(self):
1461     """Check prerequisites.
1462
1463     This checks:
1464      - the new node is not already in the config
1465      - it is resolvable
1466      - its parameters (single/dual homed) matches the cluster
1467
1468     Any errors are signalled by raising errors.OpPrereqError.
1469
1470     """
1471     node_name = self.op.node_name
1472     cfg = self.cfg
1473
1474     dns_data = utils.HostInfo(node_name)
1475
1476     node = dns_data.name
1477     primary_ip = self.op.primary_ip = dns_data.ip
1478     secondary_ip = getattr(self.op, "secondary_ip", None)
1479     if secondary_ip is None:
1480       secondary_ip = primary_ip
1481     if not utils.IsValidIP(secondary_ip):
1482       raise errors.OpPrereqError("Invalid secondary IP given")
1483     self.op.secondary_ip = secondary_ip
1484     node_list = cfg.GetNodeList()
1485     if node in node_list:
1486       raise errors.OpPrereqError("Node %s is already in the configuration"
1487                                  % node)
1488
1489     for existing_node_name in node_list:
1490       existing_node = cfg.GetNodeInfo(existing_node_name)
1491       if (existing_node.primary_ip == primary_ip or
1492           existing_node.secondary_ip == primary_ip or
1493           existing_node.primary_ip == secondary_ip or
1494           existing_node.secondary_ip == secondary_ip):
1495         raise errors.OpPrereqError("New node ip address(es) conflict with"
1496                                    " existing node %s" % existing_node.name)
1497
1498     # check that the type of the node (single versus dual homed) is the
1499     # same as for the master
1500     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501     master_singlehomed = myself.secondary_ip == myself.primary_ip
1502     newbie_singlehomed = secondary_ip == primary_ip
1503     if master_singlehomed != newbie_singlehomed:
1504       if master_singlehomed:
1505         raise errors.OpPrereqError("The master has no private ip but the"
1506                                    " new node has one")
1507       else:
1508         raise errors.OpPrereqError("The master has a private ip but the"
1509                                    " new node doesn't have one")
1510
1511     # checks reachablity
1512     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513       raise errors.OpPrereqError("Node not reachable by ping")
1514
1515     if not newbie_singlehomed:
1516       # check reachability from my secondary ip to newbie's secondary ip
1517       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518                            source=myself.secondary_ip):
1519         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520                                    " based ping to noded port")
1521
1522     self.new_node = objects.Node(name=node,
1523                                  primary_ip=primary_ip,
1524                                  secondary_ip=secondary_ip)
1525
1526     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529                                    constants.VNC_PASSWORD_FILE)
1530
1531   def Exec(self, feedback_fn):
1532     """Adds the new node to the cluster.
1533
1534     """
1535     new_node = self.new_node
1536     node = new_node.name
1537
1538     # set up inter-node password and certificate and restarts the node daemon
1539     gntpass = self.sstore.GetNodeDaemonPassword()
1540     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541       raise errors.OpExecError("ganeti password corruption detected")
1542     f = open(constants.SSL_CERT_FILE)
1543     try:
1544       gntpem = f.read(8192)
1545     finally:
1546       f.close()
1547     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548     # so we use this to detect an invalid certificate; as long as the
1549     # cert doesn't contain this, the here-document will be correctly
1550     # parsed by the shell sequence below
1551     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553     if not gntpem.endswith("\n"):
1554       raise errors.OpExecError("PEM must end with newline")
1555     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1556
1557     # and then connect with ssh to set password and start ganeti-noded
1558     # note that all the below variables are sanitized at this point,
1559     # either by being constants or by the checks above
1560     ss = self.sstore
1561     mycommand = ("umask 077 && "
1562                  "echo '%s' > '%s' && "
1563                  "cat > '%s' << '!EOF.' && \n"
1564                  "%s!EOF.\n%s restart" %
1565                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566                   constants.SSL_CERT_FILE, gntpem,
1567                   constants.NODE_INITD_SCRIPT))
1568
1569     result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1570     if result.failed:
1571       raise errors.OpExecError("Remote command on node %s, error: %s,"
1572                                " output: %s" %
1573                                (node, result.fail_reason, result.output))
1574
1575     # check connectivity
1576     time.sleep(4)
1577
1578     result = rpc.call_version([node])[node]
1579     if result:
1580       if constants.PROTOCOL_VERSION == result:
1581         logger.Info("communication to node %s fine, sw version %s match" %
1582                     (node, result))
1583       else:
1584         raise errors.OpExecError("Version mismatch master version %s,"
1585                                  " node version %s" %
1586                                  (constants.PROTOCOL_VERSION, result))
1587     else:
1588       raise errors.OpExecError("Cannot get version from the new node")
1589
1590     # setup ssh on node
1591     logger.Info("copy ssh key to node %s" % node)
1592     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1593     keyarray = []
1594     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1596                 priv_key, pub_key]
1597
1598     for i in keyfiles:
1599       f = open(i, 'r')
1600       try:
1601         keyarray.append(f.read())
1602       finally:
1603         f.close()
1604
1605     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606                                keyarray[3], keyarray[4], keyarray[5])
1607
1608     if not result:
1609       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1610
1611     # Add node to our /etc/hosts, and add key to known_hosts
1612     _AddHostToEtcHosts(new_node.name)
1613
1614     if new_node.secondary_ip != new_node.primary_ip:
1615       if not rpc.call_node_tcp_ping(new_node.name,
1616                                     constants.LOCALHOST_IP_ADDRESS,
1617                                     new_node.secondary_ip,
1618                                     constants.DEFAULT_NODED_PORT,
1619                                     10, False):
1620         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621                                  " you gave (%s). Please fix and re-run this"
1622                                  " command." % new_node.secondary_ip)
1623
1624     success, msg = self.ssh.VerifyNodeHostname(node)
1625     if not success:
1626       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627                                " than the one the resolver gives: %s."
1628                                " Please fix and re-run this command." %
1629                                (node, msg))
1630
1631     # Distribute updated /etc/hosts and known_hosts to all nodes,
1632     # including the node just added
1633     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634     dist_nodes = self.cfg.GetNodeList() + [node]
1635     if myself.name in dist_nodes:
1636       dist_nodes.remove(myself.name)
1637
1638     logger.Debug("Copying hosts and known_hosts to all nodes")
1639     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1640       result = rpc.call_upload_file(dist_nodes, fname)
1641       for to_node in dist_nodes:
1642         if not result[to_node]:
1643           logger.Error("copy of file %s to node %s failed" %
1644                        (fname, to_node))
1645
1646     to_copy = ss.GetFileList()
1647     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1648       to_copy.append(constants.VNC_PASSWORD_FILE)
1649     for fname in to_copy:
1650       if not self.ssh.CopyFileToNode(node, fname):
1651         logger.Error("could not copy file %s to node %s" % (fname, node))
1652
1653     logger.Info("adding node %s to cluster.conf" % node)
1654     self.cfg.AddNode(new_node)
1655
1656
1657 class LUMasterFailover(LogicalUnit):
1658   """Failover the master node to the current node.
1659
1660   This is a special LU in that it must run on a non-master node.
1661
1662   """
1663   HPATH = "master-failover"
1664   HTYPE = constants.HTYPE_CLUSTER
1665   REQ_MASTER = False
1666   _OP_REQP = []
1667
1668   def BuildHooksEnv(self):
1669     """Build hooks env.
1670
1671     This will run on the new master only in the pre phase, and on all
1672     the nodes in the post phase.
1673
1674     """
1675     env = {
1676       "OP_TARGET": self.new_master,
1677       "NEW_MASTER": self.new_master,
1678       "OLD_MASTER": self.old_master,
1679       }
1680     return env, [self.new_master], self.cfg.GetNodeList()
1681
1682   def CheckPrereq(self):
1683     """Check prerequisites.
1684
1685     This checks that we are not already the master.
1686
1687     """
1688     self.new_master = utils.HostInfo().name
1689     self.old_master = self.sstore.GetMasterNode()
1690
1691     if self.old_master == self.new_master:
1692       raise errors.OpPrereqError("This commands must be run on the node"
1693                                  " where you want the new master to be."
1694                                  " %s is already the master" %
1695                                  self.old_master)
1696
1697   def Exec(self, feedback_fn):
1698     """Failover the master node.
1699
1700     This command, when run on a non-master node, will cause the current
1701     master to cease being master, and the non-master to become new
1702     master.
1703
1704     """
1705     #TODO: do not rely on gethostname returning the FQDN
1706     logger.Info("setting master to %s, old master: %s" %
1707                 (self.new_master, self.old_master))
1708
1709     if not rpc.call_node_stop_master(self.old_master):
1710       logger.Error("could disable the master role on the old master"
1711                    " %s, please disable manually" % self.old_master)
1712
1713     ss = self.sstore
1714     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1715     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1716                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1717       logger.Error("could not distribute the new simple store master file"
1718                    " to the other nodes, please check.")
1719
1720     if not rpc.call_node_start_master(self.new_master):
1721       logger.Error("could not start the master role on the new master"
1722                    " %s, please check" % self.new_master)
1723       feedback_fn("Error in activating the master IP on the new master,"
1724                   " please fix manually.")
1725
1726
1727
1728 class LUQueryClusterInfo(NoHooksLU):
1729   """Query cluster configuration.
1730
1731   """
1732   _OP_REQP = []
1733   REQ_MASTER = False
1734
1735   def CheckPrereq(self):
1736     """No prerequsites needed for this LU.
1737
1738     """
1739     pass
1740
1741   def Exec(self, feedback_fn):
1742     """Return cluster config.
1743
1744     """
1745     result = {
1746       "name": self.sstore.GetClusterName(),
1747       "software_version": constants.RELEASE_VERSION,
1748       "protocol_version": constants.PROTOCOL_VERSION,
1749       "config_version": constants.CONFIG_VERSION,
1750       "os_api_version": constants.OS_API_VERSION,
1751       "export_version": constants.EXPORT_VERSION,
1752       "master": self.sstore.GetMasterNode(),
1753       "architecture": (platform.architecture()[0], platform.machine()),
1754       }
1755
1756     return result
1757
1758
1759 class LUClusterCopyFile(NoHooksLU):
1760   """Copy file to cluster.
1761
1762   """
1763   _OP_REQP = ["nodes", "filename"]
1764
1765   def CheckPrereq(self):
1766     """Check prerequisites.
1767
1768     It should check that the named file exists and that the given list
1769     of nodes is valid.
1770
1771     """
1772     if not os.path.exists(self.op.filename):
1773       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1774
1775     self.nodes = _GetWantedNodes(self, self.op.nodes)
1776
1777   def Exec(self, feedback_fn):
1778     """Copy a file from master to some nodes.
1779
1780     Args:
1781       opts - class with options as members
1782       args - list containing a single element, the file name
1783     Opts used:
1784       nodes - list containing the name of target nodes; if empty, all nodes
1785
1786     """
1787     filename = self.op.filename
1788
1789     myname = utils.HostInfo().name
1790
1791     for node in self.nodes:
1792       if node == myname:
1793         continue
1794       if not self.ssh.CopyFileToNode(node, filename):
1795         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1796
1797
1798 class LUDumpClusterConfig(NoHooksLU):
1799   """Return a text-representation of the cluster-config.
1800
1801   """
1802   _OP_REQP = []
1803
1804   def CheckPrereq(self):
1805     """No prerequisites.
1806
1807     """
1808     pass
1809
1810   def Exec(self, feedback_fn):
1811     """Dump a representation of the cluster config to the standard output.
1812
1813     """
1814     return self.cfg.DumpConfig()
1815
1816
1817 class LURunClusterCommand(NoHooksLU):
1818   """Run a command on some nodes.
1819
1820   """
1821   _OP_REQP = ["command", "nodes"]
1822
1823   def CheckPrereq(self):
1824     """Check prerequisites.
1825
1826     It checks that the given list of nodes is valid.
1827
1828     """
1829     self.nodes = _GetWantedNodes(self, self.op.nodes)
1830
1831   def Exec(self, feedback_fn):
1832     """Run a command on some nodes.
1833
1834     """
1835     # put the master at the end of the nodes list
1836     master_node = self.sstore.GetMasterNode()
1837     if master_node in self.nodes:
1838       self.nodes.remove(master_node)
1839       self.nodes.append(master_node)
1840
1841     data = []
1842     for node in self.nodes:
1843       result = self.ssh.Run(node, "root", self.op.command)
1844       data.append((node, result.output, result.exit_code))
1845
1846     return data
1847
1848
1849 class LUActivateInstanceDisks(NoHooksLU):
1850   """Bring up an instance's disks.
1851
1852   """
1853   _OP_REQP = ["instance_name"]
1854
1855   def CheckPrereq(self):
1856     """Check prerequisites.
1857
1858     This checks that the instance is in the cluster.
1859
1860     """
1861     instance = self.cfg.GetInstanceInfo(
1862       self.cfg.ExpandInstanceName(self.op.instance_name))
1863     if instance is None:
1864       raise errors.OpPrereqError("Instance '%s' not known" %
1865                                  self.op.instance_name)
1866     self.instance = instance
1867
1868
1869   def Exec(self, feedback_fn):
1870     """Activate the disks.
1871
1872     """
1873     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1874     if not disks_ok:
1875       raise errors.OpExecError("Cannot activate block devices")
1876
1877     return disks_info
1878
1879
1880 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1881   """Prepare the block devices for an instance.
1882
1883   This sets up the block devices on all nodes.
1884
1885   Args:
1886     instance: a ganeti.objects.Instance object
1887     ignore_secondaries: if true, errors on secondary nodes won't result
1888                         in an error return from the function
1889
1890   Returns:
1891     false if the operation failed
1892     list of (host, instance_visible_name, node_visible_name) if the operation
1893          suceeded with the mapping from node devices to instance devices
1894   """
1895   device_info = []
1896   disks_ok = True
1897   iname = instance.name
1898   # With the two passes mechanism we try to reduce the window of
1899   # opportunity for the race condition of switching DRBD to primary
1900   # before handshaking occured, but we do not eliminate it
1901
1902   # The proper fix would be to wait (with some limits) until the
1903   # connection has been made and drbd transitions from WFConnection
1904   # into any other network-connected state (Connected, SyncTarget,
1905   # SyncSource, etc.)
1906
1907   # 1st pass, assemble on all nodes in secondary mode
1908   for inst_disk in instance.disks:
1909     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910       cfg.SetDiskID(node_disk, node)
1911       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1912       if not result:
1913         logger.Error("could not prepare block device %s on node %s"
1914                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1915         if not ignore_secondaries:
1916           disks_ok = False
1917
1918   # FIXME: race condition on drbd migration to primary
1919
1920   # 2nd pass, do only the primary node
1921   for inst_disk in instance.disks:
1922     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1923       if node != instance.primary_node:
1924         continue
1925       cfg.SetDiskID(node_disk, node)
1926       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1927       if not result:
1928         logger.Error("could not prepare block device %s on node %s"
1929                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1930         disks_ok = False
1931     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1932
1933   # leave the disks configured for the primary node
1934   # this is a workaround that would be fixed better by
1935   # improving the logical/physical id handling
1936   for disk in instance.disks:
1937     cfg.SetDiskID(disk, instance.primary_node)
1938
1939   return disks_ok, device_info
1940
1941
1942 def _StartInstanceDisks(cfg, instance, force):
1943   """Start the disks of an instance.
1944
1945   """
1946   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1947                                            ignore_secondaries=force)
1948   if not disks_ok:
1949     _ShutdownInstanceDisks(instance, cfg)
1950     if force is not None and not force:
1951       logger.Error("If the message above refers to a secondary node,"
1952                    " you can retry the operation using '--force'.")
1953     raise errors.OpExecError("Disk consistency error")
1954
1955
1956 class LUDeactivateInstanceDisks(NoHooksLU):
1957   """Shutdown an instance's disks.
1958
1959   """
1960   _OP_REQP = ["instance_name"]
1961
1962   def CheckPrereq(self):
1963     """Check prerequisites.
1964
1965     This checks that the instance is in the cluster.
1966
1967     """
1968     instance = self.cfg.GetInstanceInfo(
1969       self.cfg.ExpandInstanceName(self.op.instance_name))
1970     if instance is None:
1971       raise errors.OpPrereqError("Instance '%s' not known" %
1972                                  self.op.instance_name)
1973     self.instance = instance
1974
1975   def Exec(self, feedback_fn):
1976     """Deactivate the disks
1977
1978     """
1979     instance = self.instance
1980     ins_l = rpc.call_instance_list([instance.primary_node])
1981     ins_l = ins_l[instance.primary_node]
1982     if not type(ins_l) is list:
1983       raise errors.OpExecError("Can't contact node '%s'" %
1984                                instance.primary_node)
1985
1986     if self.instance.name in ins_l:
1987       raise errors.OpExecError("Instance is running, can't shutdown"
1988                                " block devices.")
1989
1990     _ShutdownInstanceDisks(instance, self.cfg)
1991
1992
1993 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1994   """Shutdown block devices of an instance.
1995
1996   This does the shutdown on all nodes of the instance.
1997
1998   If the ignore_primary is false, errors on the primary node are
1999   ignored.
2000
2001   """
2002   result = True
2003   for disk in instance.disks:
2004     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2005       cfg.SetDiskID(top_disk, node)
2006       if not rpc.call_blockdev_shutdown(node, top_disk):
2007         logger.Error("could not shutdown block device %s on node %s" %
2008                      (disk.iv_name, node))
2009         if not ignore_primary or node != instance.primary_node:
2010           result = False
2011   return result
2012
2013
2014 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2015   """Checks if a node has enough free memory.
2016
2017   This function check if a given node has the needed amount of free
2018   memory. In case the node has less memory or we cannot get the
2019   information from the node, this function raise an OpPrereqError
2020   exception.
2021
2022   Args:
2023     - cfg: a ConfigWriter instance
2024     - node: the node name
2025     - reason: string to use in the error message
2026     - requested: the amount of memory in MiB
2027
2028   """
2029   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2030   if not nodeinfo or not isinstance(nodeinfo, dict):
2031     raise errors.OpPrereqError("Could not contact node %s for resource"
2032                              " information" % (node,))
2033
2034   free_mem = nodeinfo[node].get('memory_free')
2035   if not isinstance(free_mem, int):
2036     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2037                              " was '%s'" % (node, free_mem))
2038   if requested > free_mem:
2039     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2040                              " needed %s MiB, available %s MiB" %
2041                              (node, reason, requested, free_mem))
2042
2043
2044 class LUStartupInstance(LogicalUnit):
2045   """Starts an instance.
2046
2047   """
2048   HPATH = "instance-start"
2049   HTYPE = constants.HTYPE_INSTANCE
2050   _OP_REQP = ["instance_name", "force"]
2051
2052   def BuildHooksEnv(self):
2053     """Build hooks env.
2054
2055     This runs on master, primary and secondary nodes of the instance.
2056
2057     """
2058     env = {
2059       "FORCE": self.op.force,
2060       }
2061     env.update(_BuildInstanceHookEnvByObject(self.instance))
2062     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2063           list(self.instance.secondary_nodes))
2064     return env, nl, nl
2065
2066   def CheckPrereq(self):
2067     """Check prerequisites.
2068
2069     This checks that the instance is in the cluster.
2070
2071     """
2072     instance = self.cfg.GetInstanceInfo(
2073       self.cfg.ExpandInstanceName(self.op.instance_name))
2074     if instance is None:
2075       raise errors.OpPrereqError("Instance '%s' not known" %
2076                                  self.op.instance_name)
2077
2078     # check bridges existance
2079     _CheckInstanceBridgesExist(instance)
2080
2081     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2082                          "starting instance %s" % instance.name,
2083                          instance.memory)
2084
2085     self.instance = instance
2086     self.op.instance_name = instance.name
2087
2088   def Exec(self, feedback_fn):
2089     """Start the instance.
2090
2091     """
2092     instance = self.instance
2093     force = self.op.force
2094     extra_args = getattr(self.op, "extra_args", "")
2095
2096     self.cfg.MarkInstanceUp(instance.name)
2097
2098     node_current = instance.primary_node
2099
2100     _StartInstanceDisks(self.cfg, instance, force)
2101
2102     if not rpc.call_instance_start(node_current, instance, extra_args):
2103       _ShutdownInstanceDisks(instance, self.cfg)
2104       raise errors.OpExecError("Could not start instance")
2105
2106
2107 class LURebootInstance(LogicalUnit):
2108   """Reboot an instance.
2109
2110   """
2111   HPATH = "instance-reboot"
2112   HTYPE = constants.HTYPE_INSTANCE
2113   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2114
2115   def BuildHooksEnv(self):
2116     """Build hooks env.
2117
2118     This runs on master, primary and secondary nodes of the instance.
2119
2120     """
2121     env = {
2122       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2123       }
2124     env.update(_BuildInstanceHookEnvByObject(self.instance))
2125     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126           list(self.instance.secondary_nodes))
2127     return env, nl, nl
2128
2129   def CheckPrereq(self):
2130     """Check prerequisites.
2131
2132     This checks that the instance is in the cluster.
2133
2134     """
2135     instance = self.cfg.GetInstanceInfo(
2136       self.cfg.ExpandInstanceName(self.op.instance_name))
2137     if instance is None:
2138       raise errors.OpPrereqError("Instance '%s' not known" %
2139                                  self.op.instance_name)
2140
2141     # check bridges existance
2142     _CheckInstanceBridgesExist(instance)
2143
2144     self.instance = instance
2145     self.op.instance_name = instance.name
2146
2147   def Exec(self, feedback_fn):
2148     """Reboot the instance.
2149
2150     """
2151     instance = self.instance
2152     ignore_secondaries = self.op.ignore_secondaries
2153     reboot_type = self.op.reboot_type
2154     extra_args = getattr(self.op, "extra_args", "")
2155
2156     node_current = instance.primary_node
2157
2158     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2159                            constants.INSTANCE_REBOOT_HARD,
2160                            constants.INSTANCE_REBOOT_FULL]:
2161       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2162                                   (constants.INSTANCE_REBOOT_SOFT,
2163                                    constants.INSTANCE_REBOOT_HARD,
2164                                    constants.INSTANCE_REBOOT_FULL))
2165
2166     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2167                        constants.INSTANCE_REBOOT_HARD]:
2168       if not rpc.call_instance_reboot(node_current, instance,
2169                                       reboot_type, extra_args):
2170         raise errors.OpExecError("Could not reboot instance")
2171     else:
2172       if not rpc.call_instance_shutdown(node_current, instance):
2173         raise errors.OpExecError("could not shutdown instance for full reboot")
2174       _ShutdownInstanceDisks(instance, self.cfg)
2175       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2176       if not rpc.call_instance_start(node_current, instance, extra_args):
2177         _ShutdownInstanceDisks(instance, self.cfg)
2178         raise errors.OpExecError("Could not start instance for full reboot")
2179
2180     self.cfg.MarkInstanceUp(instance.name)
2181
2182
2183 class LUShutdownInstance(LogicalUnit):
2184   """Shutdown an instance.
2185
2186   """
2187   HPATH = "instance-stop"
2188   HTYPE = constants.HTYPE_INSTANCE
2189   _OP_REQP = ["instance_name"]
2190
2191   def BuildHooksEnv(self):
2192     """Build hooks env.
2193
2194     This runs on master, primary and secondary nodes of the instance.
2195
2196     """
2197     env = _BuildInstanceHookEnvByObject(self.instance)
2198     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199           list(self.instance.secondary_nodes))
2200     return env, nl, nl
2201
2202   def CheckPrereq(self):
2203     """Check prerequisites.
2204
2205     This checks that the instance is in the cluster.
2206
2207     """
2208     instance = self.cfg.GetInstanceInfo(
2209       self.cfg.ExpandInstanceName(self.op.instance_name))
2210     if instance is None:
2211       raise errors.OpPrereqError("Instance '%s' not known" %
2212                                  self.op.instance_name)
2213     self.instance = instance
2214
2215   def Exec(self, feedback_fn):
2216     """Shutdown the instance.
2217
2218     """
2219     instance = self.instance
2220     node_current = instance.primary_node
2221     self.cfg.MarkInstanceDown(instance.name)
2222     if not rpc.call_instance_shutdown(node_current, instance):
2223       logger.Error("could not shutdown instance")
2224
2225     _ShutdownInstanceDisks(instance, self.cfg)
2226
2227
2228 class LUReinstallInstance(LogicalUnit):
2229   """Reinstall an instance.
2230
2231   """
2232   HPATH = "instance-reinstall"
2233   HTYPE = constants.HTYPE_INSTANCE
2234   _OP_REQP = ["instance_name"]
2235
2236   def BuildHooksEnv(self):
2237     """Build hooks env.
2238
2239     This runs on master, primary and secondary nodes of the instance.
2240
2241     """
2242     env = _BuildInstanceHookEnvByObject(self.instance)
2243     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2244           list(self.instance.secondary_nodes))
2245     return env, nl, nl
2246
2247   def CheckPrereq(self):
2248     """Check prerequisites.
2249
2250     This checks that the instance is in the cluster and is not running.
2251
2252     """
2253     instance = self.cfg.GetInstanceInfo(
2254       self.cfg.ExpandInstanceName(self.op.instance_name))
2255     if instance is None:
2256       raise errors.OpPrereqError("Instance '%s' not known" %
2257                                  self.op.instance_name)
2258     if instance.disk_template == constants.DT_DISKLESS:
2259       raise errors.OpPrereqError("Instance '%s' has no disks" %
2260                                  self.op.instance_name)
2261     if instance.status != "down":
2262       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2263                                  self.op.instance_name)
2264     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2265     if remote_info:
2266       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2267                                  (self.op.instance_name,
2268                                   instance.primary_node))
2269
2270     self.op.os_type = getattr(self.op, "os_type", None)
2271     if self.op.os_type is not None:
2272       # OS verification
2273       pnode = self.cfg.GetNodeInfo(
2274         self.cfg.ExpandNodeName(instance.primary_node))
2275       if pnode is None:
2276         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2277                                    self.op.pnode)
2278       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2279       if not os_obj:
2280         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2281                                    " primary node"  % self.op.os_type)
2282
2283     self.instance = instance
2284
2285   def Exec(self, feedback_fn):
2286     """Reinstall the instance.
2287
2288     """
2289     inst = self.instance
2290
2291     if self.op.os_type is not None:
2292       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2293       inst.os = self.op.os_type
2294       self.cfg.AddInstance(inst)
2295
2296     _StartInstanceDisks(self.cfg, inst, None)
2297     try:
2298       feedback_fn("Running the instance OS create scripts...")
2299       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2300         raise errors.OpExecError("Could not install OS for instance %s"
2301                                  " on node %s" %
2302                                  (inst.name, inst.primary_node))
2303     finally:
2304       _ShutdownInstanceDisks(inst, self.cfg)
2305
2306
2307 class LURenameInstance(LogicalUnit):
2308   """Rename an instance.
2309
2310   """
2311   HPATH = "instance-rename"
2312   HTYPE = constants.HTYPE_INSTANCE
2313   _OP_REQP = ["instance_name", "new_name"]
2314
2315   def BuildHooksEnv(self):
2316     """Build hooks env.
2317
2318     This runs on master, primary and secondary nodes of the instance.
2319
2320     """
2321     env = _BuildInstanceHookEnvByObject(self.instance)
2322     env["INSTANCE_NEW_NAME"] = self.op.new_name
2323     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2324           list(self.instance.secondary_nodes))
2325     return env, nl, nl
2326
2327   def CheckPrereq(self):
2328     """Check prerequisites.
2329
2330     This checks that the instance is in the cluster and is not running.
2331
2332     """
2333     instance = self.cfg.GetInstanceInfo(
2334       self.cfg.ExpandInstanceName(self.op.instance_name))
2335     if instance is None:
2336       raise errors.OpPrereqError("Instance '%s' not known" %
2337                                  self.op.instance_name)
2338     if instance.status != "down":
2339       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2340                                  self.op.instance_name)
2341     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2342     if remote_info:
2343       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2344                                  (self.op.instance_name,
2345                                   instance.primary_node))
2346     self.instance = instance
2347
2348     # new name verification
2349     name_info = utils.HostInfo(self.op.new_name)
2350
2351     self.op.new_name = new_name = name_info.name
2352     instance_list = self.cfg.GetInstanceList()
2353     if new_name in instance_list:
2354       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2355                                  instance_name)
2356
2357     if not getattr(self.op, "ignore_ip", False):
2358       command = ["fping", "-q", name_info.ip]
2359       result = utils.RunCmd(command)
2360       if not result.failed:
2361         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2362                                    (name_info.ip, new_name))
2363
2364
2365   def Exec(self, feedback_fn):
2366     """Reinstall the instance.
2367
2368     """
2369     inst = self.instance
2370     old_name = inst.name
2371
2372     self.cfg.RenameInstance(inst.name, self.op.new_name)
2373
2374     # re-read the instance from the configuration after rename
2375     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2376
2377     _StartInstanceDisks(self.cfg, inst, None)
2378     try:
2379       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2380                                           "sda", "sdb"):
2381         msg = ("Could run OS rename script for instance %s on node %s (but the"
2382                " instance has been renamed in Ganeti)" %
2383                (inst.name, inst.primary_node))
2384         logger.Error(msg)
2385     finally:
2386       _ShutdownInstanceDisks(inst, self.cfg)
2387
2388
2389 class LURemoveInstance(LogicalUnit):
2390   """Remove an instance.
2391
2392   """
2393   HPATH = "instance-remove"
2394   HTYPE = constants.HTYPE_INSTANCE
2395   _OP_REQP = ["instance_name"]
2396
2397   def BuildHooksEnv(self):
2398     """Build hooks env.
2399
2400     This runs on master, primary and secondary nodes of the instance.
2401
2402     """
2403     env = _BuildInstanceHookEnvByObject(self.instance)
2404     nl = [self.sstore.GetMasterNode()]
2405     return env, nl, nl
2406
2407   def CheckPrereq(self):
2408     """Check prerequisites.
2409
2410     This checks that the instance is in the cluster.
2411
2412     """
2413     instance = self.cfg.GetInstanceInfo(
2414       self.cfg.ExpandInstanceName(self.op.instance_name))
2415     if instance is None:
2416       raise errors.OpPrereqError("Instance '%s' not known" %
2417                                  self.op.instance_name)
2418     self.instance = instance
2419
2420   def Exec(self, feedback_fn):
2421     """Remove the instance.
2422
2423     """
2424     instance = self.instance
2425     logger.Info("shutting down instance %s on node %s" %
2426                 (instance.name, instance.primary_node))
2427
2428     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2429       if self.op.ignore_failures:
2430         feedback_fn("Warning: can't shutdown instance")
2431       else:
2432         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2433                                  (instance.name, instance.primary_node))
2434
2435     logger.Info("removing block devices for instance %s" % instance.name)
2436
2437     if not _RemoveDisks(instance, self.cfg):
2438       if self.op.ignore_failures:
2439         feedback_fn("Warning: can't remove instance's disks")
2440       else:
2441         raise errors.OpExecError("Can't remove instance's disks")
2442
2443     logger.Info("removing instance %s out of cluster config" % instance.name)
2444
2445     self.cfg.RemoveInstance(instance.name)
2446
2447
2448 class LUQueryInstances(NoHooksLU):
2449   """Logical unit for querying instances.
2450
2451   """
2452   _OP_REQP = ["output_fields", "names"]
2453
2454   def CheckPrereq(self):
2455     """Check prerequisites.
2456
2457     This checks that the fields required are valid output fields.
2458
2459     """
2460     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2461     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2462                                "admin_state", "admin_ram",
2463                                "disk_template", "ip", "mac", "bridge",
2464                                "sda_size", "sdb_size", "vcpus"],
2465                        dynamic=self.dynamic_fields,
2466                        selected=self.op.output_fields)
2467
2468     self.wanted = _GetWantedInstances(self, self.op.names)
2469
2470   def Exec(self, feedback_fn):
2471     """Computes the list of nodes and their attributes.
2472
2473     """
2474     instance_names = self.wanted
2475     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2476                      in instance_names]
2477
2478     # begin data gathering
2479
2480     nodes = frozenset([inst.primary_node for inst in instance_list])
2481
2482     bad_nodes = []
2483     if self.dynamic_fields.intersection(self.op.output_fields):
2484       live_data = {}
2485       node_data = rpc.call_all_instances_info(nodes)
2486       for name in nodes:
2487         result = node_data[name]
2488         if result:
2489           live_data.update(result)
2490         elif result == False:
2491           bad_nodes.append(name)
2492         # else no instance is alive
2493     else:
2494       live_data = dict([(name, {}) for name in instance_names])
2495
2496     # end data gathering
2497
2498     output = []
2499     for instance in instance_list:
2500       iout = []
2501       for field in self.op.output_fields:
2502         if field == "name":
2503           val = instance.name
2504         elif field == "os":
2505           val = instance.os
2506         elif field == "pnode":
2507           val = instance.primary_node
2508         elif field == "snodes":
2509           val = list(instance.secondary_nodes)
2510         elif field == "admin_state":
2511           val = (instance.status != "down")
2512         elif field == "oper_state":
2513           if instance.primary_node in bad_nodes:
2514             val = None
2515           else:
2516             val = bool(live_data.get(instance.name))
2517         elif field == "status":
2518           if instance.primary_node in bad_nodes:
2519             val = "ERROR_nodedown"
2520           else:
2521             running = bool(live_data.get(instance.name))
2522             if running:
2523               if instance.status != "down":
2524                 val = "running"
2525               else:
2526                 val = "ERROR_up"
2527             else:
2528               if instance.status != "down":
2529                 val = "ERROR_down"
2530               else:
2531                 val = "ADMIN_down"
2532         elif field == "admin_ram":
2533           val = instance.memory
2534         elif field == "oper_ram":
2535           if instance.primary_node in bad_nodes:
2536             val = None
2537           elif instance.name in live_data:
2538             val = live_data[instance.name].get("memory", "?")
2539           else:
2540             val = "-"
2541         elif field == "disk_template":
2542           val = instance.disk_template
2543         elif field == "ip":
2544           val = instance.nics[0].ip
2545         elif field == "bridge":
2546           val = instance.nics[0].bridge
2547         elif field == "mac":
2548           val = instance.nics[0].mac
2549         elif field == "sda_size" or field == "sdb_size":
2550           disk = instance.FindDisk(field[:3])
2551           if disk is None:
2552             val = None
2553           else:
2554             val = disk.size
2555         elif field == "vcpus":
2556           val = instance.vcpus
2557         else:
2558           raise errors.ParameterError(field)
2559         iout.append(val)
2560       output.append(iout)
2561
2562     return output
2563
2564
2565 class LUFailoverInstance(LogicalUnit):
2566   """Failover an instance.
2567
2568   """
2569   HPATH = "instance-failover"
2570   HTYPE = constants.HTYPE_INSTANCE
2571   _OP_REQP = ["instance_name", "ignore_consistency"]
2572
2573   def BuildHooksEnv(self):
2574     """Build hooks env.
2575
2576     This runs on master, primary and secondary nodes of the instance.
2577
2578     """
2579     env = {
2580       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2581       }
2582     env.update(_BuildInstanceHookEnvByObject(self.instance))
2583     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2584     return env, nl, nl
2585
2586   def CheckPrereq(self):
2587     """Check prerequisites.
2588
2589     This checks that the instance is in the cluster.
2590
2591     """
2592     instance = self.cfg.GetInstanceInfo(
2593       self.cfg.ExpandInstanceName(self.op.instance_name))
2594     if instance is None:
2595       raise errors.OpPrereqError("Instance '%s' not known" %
2596                                  self.op.instance_name)
2597
2598     if instance.disk_template not in constants.DTS_NET_MIRROR:
2599       raise errors.OpPrereqError("Instance's disk layout is not"
2600                                  " network mirrored, cannot failover.")
2601
2602     secondary_nodes = instance.secondary_nodes
2603     if not secondary_nodes:
2604       raise errors.ProgrammerError("no secondary node but using "
2605                                    "DT_REMOTE_RAID1 template")
2606
2607     target_node = secondary_nodes[0]
2608     # check memory requirements on the secondary node
2609     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2610                          instance.name, instance.memory)
2611
2612     # check bridge existance
2613     brlist = [nic.bridge for nic in instance.nics]
2614     if not rpc.call_bridges_exist(target_node, brlist):
2615       raise errors.OpPrereqError("One or more target bridges %s does not"
2616                                  " exist on destination node '%s'" %
2617                                  (brlist, target_node))
2618
2619     self.instance = instance
2620
2621   def Exec(self, feedback_fn):
2622     """Failover an instance.
2623
2624     The failover is done by shutting it down on its present node and
2625     starting it on the secondary.
2626
2627     """
2628     instance = self.instance
2629
2630     source_node = instance.primary_node
2631     target_node = instance.secondary_nodes[0]
2632
2633     feedback_fn("* checking disk consistency between source and target")
2634     for dev in instance.disks:
2635       # for remote_raid1, these are md over drbd
2636       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2637         if instance.status == "up" and not self.op.ignore_consistency:
2638           raise errors.OpExecError("Disk %s is degraded on target node,"
2639                                    " aborting failover." % dev.iv_name)
2640
2641     feedback_fn("* shutting down instance on source node")
2642     logger.Info("Shutting down instance %s on node %s" %
2643                 (instance.name, source_node))
2644
2645     if not rpc.call_instance_shutdown(source_node, instance):
2646       if self.op.ignore_consistency:
2647         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2648                      " anyway. Please make sure node %s is down"  %
2649                      (instance.name, source_node, source_node))
2650       else:
2651         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2652                                  (instance.name, source_node))
2653
2654     feedback_fn("* deactivating the instance's disks on source node")
2655     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2656       raise errors.OpExecError("Can't shut down the instance's disks.")
2657
2658     instance.primary_node = target_node
2659     # distribute new instance config to the other nodes
2660     self.cfg.AddInstance(instance)
2661
2662     # Only start the instance if it's marked as up
2663     if instance.status == "up":
2664       feedback_fn("* activating the instance's disks on target node")
2665       logger.Info("Starting instance %s on node %s" %
2666                   (instance.name, target_node))
2667
2668       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2669                                                ignore_secondaries=True)
2670       if not disks_ok:
2671         _ShutdownInstanceDisks(instance, self.cfg)
2672         raise errors.OpExecError("Can't activate the instance's disks")
2673
2674       feedback_fn("* starting the instance on the target node")
2675       if not rpc.call_instance_start(target_node, instance, None):
2676         _ShutdownInstanceDisks(instance, self.cfg)
2677         raise errors.OpExecError("Could not start instance %s on node %s." %
2678                                  (instance.name, target_node))
2679
2680
2681 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2682   """Create a tree of block devices on the primary node.
2683
2684   This always creates all devices.
2685
2686   """
2687   if device.children:
2688     for child in device.children:
2689       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2690         return False
2691
2692   cfg.SetDiskID(device, node)
2693   new_id = rpc.call_blockdev_create(node, device, device.size,
2694                                     instance.name, True, info)
2695   if not new_id:
2696     return False
2697   if device.physical_id is None:
2698     device.physical_id = new_id
2699   return True
2700
2701
2702 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2703   """Create a tree of block devices on a secondary node.
2704
2705   If this device type has to be created on secondaries, create it and
2706   all its children.
2707
2708   If not, just recurse to children keeping the same 'force' value.
2709
2710   """
2711   if device.CreateOnSecondary():
2712     force = True
2713   if device.children:
2714     for child in device.children:
2715       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2716                                         child, force, info):
2717         return False
2718
2719   if not force:
2720     return True
2721   cfg.SetDiskID(device, node)
2722   new_id = rpc.call_blockdev_create(node, device, device.size,
2723                                     instance.name, False, info)
2724   if not new_id:
2725     return False
2726   if device.physical_id is None:
2727     device.physical_id = new_id
2728   return True
2729
2730
2731 def _GenerateUniqueNames(cfg, exts):
2732   """Generate a suitable LV name.
2733
2734   This will generate a logical volume name for the given instance.
2735
2736   """
2737   results = []
2738   for val in exts:
2739     new_id = cfg.GenerateUniqueID()
2740     results.append("%s%s" % (new_id, val))
2741   return results
2742
2743
2744 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2745   """Generate a drbd device complete with its children.
2746
2747   """
2748   port = cfg.AllocatePort()
2749   vgname = cfg.GetVGName()
2750   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2751                           logical_id=(vgname, names[0]))
2752   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2753                           logical_id=(vgname, names[1]))
2754   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2755                           logical_id = (primary, secondary, port),
2756                           children = [dev_data, dev_meta])
2757   return drbd_dev
2758
2759
2760 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2761   """Generate a drbd8 device complete with its children.
2762
2763   """
2764   port = cfg.AllocatePort()
2765   vgname = cfg.GetVGName()
2766   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2767                           logical_id=(vgname, names[0]))
2768   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2769                           logical_id=(vgname, names[1]))
2770   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2771                           logical_id = (primary, secondary, port),
2772                           children = [dev_data, dev_meta],
2773                           iv_name=iv_name)
2774   return drbd_dev
2775
2776
2777 def _GenerateDiskTemplate(cfg, template_name,
2778                           instance_name, primary_node,
2779                           secondary_nodes, disk_sz, swap_sz):
2780   """Generate the entire disk layout for a given template type.
2781
2782   """
2783   #TODO: compute space requirements
2784
2785   vgname = cfg.GetVGName()
2786   if template_name == constants.DT_DISKLESS:
2787     disks = []
2788   elif template_name == constants.DT_PLAIN:
2789     if len(secondary_nodes) != 0:
2790       raise errors.ProgrammerError("Wrong template configuration")
2791
2792     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2793     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2794                            logical_id=(vgname, names[0]),
2795                            iv_name = "sda")
2796     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2797                            logical_id=(vgname, names[1]),
2798                            iv_name = "sdb")
2799     disks = [sda_dev, sdb_dev]
2800   elif template_name == constants.DT_DRBD8:
2801     if len(secondary_nodes) != 1:
2802       raise errors.ProgrammerError("Wrong template configuration")
2803     remote_node = secondary_nodes[0]
2804     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2805                                        ".sdb_data", ".sdb_meta"])
2806     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2807                                          disk_sz, names[0:2], "sda")
2808     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2809                                          swap_sz, names[2:4], "sdb")
2810     disks = [drbd_sda_dev, drbd_sdb_dev]
2811   else:
2812     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2813   return disks
2814
2815
2816 def _GetInstanceInfoText(instance):
2817   """Compute that text that should be added to the disk's metadata.
2818
2819   """
2820   return "originstname+%s" % instance.name
2821
2822
2823 def _CreateDisks(cfg, instance):
2824   """Create all disks for an instance.
2825
2826   This abstracts away some work from AddInstance.
2827
2828   Args:
2829     instance: the instance object
2830
2831   Returns:
2832     True or False showing the success of the creation process
2833
2834   """
2835   info = _GetInstanceInfoText(instance)
2836
2837   for device in instance.disks:
2838     logger.Info("creating volume %s for instance %s" %
2839               (device.iv_name, instance.name))
2840     #HARDCODE
2841     for secondary_node in instance.secondary_nodes:
2842       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2843                                         device, False, info):
2844         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2845                      (device.iv_name, device, secondary_node))
2846         return False
2847     #HARDCODE
2848     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2849                                     instance, device, info):
2850       logger.Error("failed to create volume %s on primary!" %
2851                    device.iv_name)
2852       return False
2853   return True
2854
2855
2856 def _RemoveDisks(instance, cfg):
2857   """Remove all disks for an instance.
2858
2859   This abstracts away some work from `AddInstance()` and
2860   `RemoveInstance()`. Note that in case some of the devices couldn't
2861   be removed, the removal will continue with the other ones (compare
2862   with `_CreateDisks()`).
2863
2864   Args:
2865     instance: the instance object
2866
2867   Returns:
2868     True or False showing the success of the removal proces
2869
2870   """
2871   logger.Info("removing block devices for instance %s" % instance.name)
2872
2873   result = True
2874   for device in instance.disks:
2875     for node, disk in device.ComputeNodeTree(instance.primary_node):
2876       cfg.SetDiskID(disk, node)
2877       if not rpc.call_blockdev_remove(node, disk):
2878         logger.Error("could not remove block device %s on node %s,"
2879                      " continuing anyway" %
2880                      (device.iv_name, node))
2881         result = False
2882   return result
2883
2884
2885 class LUCreateInstance(LogicalUnit):
2886   """Create an instance.
2887
2888   """
2889   HPATH = "instance-add"
2890   HTYPE = constants.HTYPE_INSTANCE
2891   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2892               "disk_template", "swap_size", "mode", "start", "vcpus",
2893               "wait_for_sync", "ip_check", "mac"]
2894
2895   def BuildHooksEnv(self):
2896     """Build hooks env.
2897
2898     This runs on master, primary and secondary nodes of the instance.
2899
2900     """
2901     env = {
2902       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2903       "INSTANCE_DISK_SIZE": self.op.disk_size,
2904       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2905       "INSTANCE_ADD_MODE": self.op.mode,
2906       }
2907     if self.op.mode == constants.INSTANCE_IMPORT:
2908       env["INSTANCE_SRC_NODE"] = self.op.src_node
2909       env["INSTANCE_SRC_PATH"] = self.op.src_path
2910       env["INSTANCE_SRC_IMAGE"] = self.src_image
2911
2912     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2913       primary_node=self.op.pnode,
2914       secondary_nodes=self.secondaries,
2915       status=self.instance_status,
2916       os_type=self.op.os_type,
2917       memory=self.op.mem_size,
2918       vcpus=self.op.vcpus,
2919       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2920     ))
2921
2922     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2923           self.secondaries)
2924     return env, nl, nl
2925
2926
2927   def CheckPrereq(self):
2928     """Check prerequisites.
2929
2930     """
2931     for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2932       if not hasattr(self.op, attr):
2933         setattr(self.op, attr, None)
2934
2935     if self.op.mode not in (constants.INSTANCE_CREATE,
2936                             constants.INSTANCE_IMPORT):
2937       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2938                                  self.op.mode)
2939
2940     if (not self.cfg.GetVGName() and
2941         self.op.disk_template not in constants.DTS_NOT_LVM):
2942       raise errors.OpPrereqError("Cluster does not support lvm-based"
2943                                  " instances")
2944
2945     if self.op.mode == constants.INSTANCE_IMPORT:
2946       src_node = getattr(self.op, "src_node", None)
2947       src_path = getattr(self.op, "src_path", None)
2948       if src_node is None or src_path is None:
2949         raise errors.OpPrereqError("Importing an instance requires source"
2950                                    " node and path options")
2951       src_node_full = self.cfg.ExpandNodeName(src_node)
2952       if src_node_full is None:
2953         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2954       self.op.src_node = src_node = src_node_full
2955
2956       if not os.path.isabs(src_path):
2957         raise errors.OpPrereqError("The source path must be absolute")
2958
2959       export_info = rpc.call_export_info(src_node, src_path)
2960
2961       if not export_info:
2962         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2963
2964       if not export_info.has_section(constants.INISECT_EXP):
2965         raise errors.ProgrammerError("Corrupted export config")
2966
2967       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2968       if (int(ei_version) != constants.EXPORT_VERSION):
2969         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2970                                    (ei_version, constants.EXPORT_VERSION))
2971
2972       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2973         raise errors.OpPrereqError("Can't import instance with more than"
2974                                    " one data disk")
2975
2976       # FIXME: are the old os-es, disk sizes, etc. useful?
2977       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2978       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2979                                                          'disk0_dump'))
2980       self.src_image = diskimage
2981     else: # INSTANCE_CREATE
2982       if getattr(self.op, "os_type", None) is None:
2983         raise errors.OpPrereqError("No guest OS specified")
2984
2985     # check primary node
2986     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2987     if pnode is None:
2988       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2989                                  self.op.pnode)
2990     self.op.pnode = pnode.name
2991     self.pnode = pnode
2992     self.secondaries = []
2993     # disk template and mirror node verification
2994     if self.op.disk_template not in constants.DISK_TEMPLATES:
2995       raise errors.OpPrereqError("Invalid disk template name")
2996
2997     if self.op.disk_template in constants.DTS_NET_MIRROR:
2998       if getattr(self.op, "snode", None) is None:
2999         raise errors.OpPrereqError("The networked disk templates need"
3000                                    " a mirror node")
3001
3002       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3003       if snode_name is None:
3004         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3005                                    self.op.snode)
3006       elif snode_name == pnode.name:
3007         raise errors.OpPrereqError("The secondary node cannot be"
3008                                    " the primary node.")
3009       self.secondaries.append(snode_name)
3010
3011     # Required free disk space as a function of disk and swap space
3012     req_size_dict = {
3013       constants.DT_DISKLESS: None,
3014       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3015       # 256 MB are added for drbd metadata, 128MB for each drbd device
3016       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3017     }
3018
3019     if self.op.disk_template not in req_size_dict:
3020       raise errors.ProgrammerError("Disk template '%s' size requirement"
3021                                    " is unknown" %  self.op.disk_template)
3022
3023     req_size = req_size_dict[self.op.disk_template]
3024
3025     # Check lv size requirements
3026     if req_size is not None:
3027       nodenames = [pnode.name] + self.secondaries
3028       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3029       for node in nodenames:
3030         info = nodeinfo.get(node, None)
3031         if not info:
3032           raise errors.OpPrereqError("Cannot get current information"
3033                                      " from node '%s'" % nodeinfo)
3034         vg_free = info.get('vg_free', None)
3035         if not isinstance(vg_free, int):
3036           raise errors.OpPrereqError("Can't compute free disk space on"
3037                                      " node %s" % node)
3038         if req_size > info['vg_free']:
3039           raise errors.OpPrereqError("Not enough disk space on target node %s."
3040                                      " %d MB available, %d MB required" %
3041                                      (node, info['vg_free'], req_size))
3042
3043     # os verification
3044     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3045     if not os_obj:
3046       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3047                                  " primary node"  % self.op.os_type)
3048
3049     if self.op.kernel_path == constants.VALUE_NONE:
3050       raise errors.OpPrereqError("Can't set instance kernel to none")
3051
3052     # instance verification
3053     hostname1 = utils.HostInfo(self.op.instance_name)
3054
3055     self.op.instance_name = instance_name = hostname1.name
3056     instance_list = self.cfg.GetInstanceList()
3057     if instance_name in instance_list:
3058       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3059                                  instance_name)
3060
3061     ip = getattr(self.op, "ip", None)
3062     if ip is None or ip.lower() == "none":
3063       inst_ip = None
3064     elif ip.lower() == "auto":
3065       inst_ip = hostname1.ip
3066     else:
3067       if not utils.IsValidIP(ip):
3068         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3069                                    " like a valid IP" % ip)
3070       inst_ip = ip
3071     self.inst_ip = inst_ip
3072
3073     if self.op.start and not self.op.ip_check:
3074       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3075                                  " adding an instance in start mode")
3076
3077     if self.op.ip_check:
3078       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3079         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3080                                    (hostname1.ip, instance_name))
3081
3082     # MAC address verification
3083     if self.op.mac != "auto":
3084       if not utils.IsValidMac(self.op.mac.lower()):
3085         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3086                                    self.op.mac)
3087
3088     # bridge verification
3089     bridge = getattr(self.op, "bridge", None)
3090     if bridge is None:
3091       self.op.bridge = self.cfg.GetDefBridge()
3092     else:
3093       self.op.bridge = bridge
3094
3095     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3096       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3097                                  " destination node '%s'" %
3098                                  (self.op.bridge, pnode.name))
3099
3100     # boot order verification
3101     if self.op.hvm_boot_order is not None:
3102       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3103         raise errors.OpPrereqError("invalid boot order specified,"
3104                                    " must be one or more of [acdn]")
3105
3106     if self.op.start:
3107       self.instance_status = 'up'
3108     else:
3109       self.instance_status = 'down'
3110
3111   def Exec(self, feedback_fn):
3112     """Create and add the instance to the cluster.
3113
3114     """
3115     instance = self.op.instance_name
3116     pnode_name = self.pnode.name
3117
3118     if self.op.mac == "auto":
3119       mac_address = self.cfg.GenerateMAC()
3120     else:
3121       mac_address = self.op.mac
3122
3123     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3124     if self.inst_ip is not None:
3125       nic.ip = self.inst_ip
3126
3127     ht_kind = self.sstore.GetHypervisorType()
3128     if ht_kind in constants.HTS_REQ_PORT:
3129       network_port = self.cfg.AllocatePort()
3130     else:
3131       network_port = None
3132
3133     disks = _GenerateDiskTemplate(self.cfg,
3134                                   self.op.disk_template,
3135                                   instance, pnode_name,
3136                                   self.secondaries, self.op.disk_size,
3137                                   self.op.swap_size)
3138
3139     iobj = objects.Instance(name=instance, os=self.op.os_type,
3140                             primary_node=pnode_name,
3141                             memory=self.op.mem_size,
3142                             vcpus=self.op.vcpus,
3143                             nics=[nic], disks=disks,
3144                             disk_template=self.op.disk_template,
3145                             status=self.instance_status,
3146                             network_port=network_port,
3147                             kernel_path=self.op.kernel_path,
3148                             initrd_path=self.op.initrd_path,
3149                             hvm_boot_order=self.op.hvm_boot_order,
3150                             )
3151
3152     feedback_fn("* creating instance disks...")
3153     if not _CreateDisks(self.cfg, iobj):
3154       _RemoveDisks(iobj, self.cfg)
3155       raise errors.OpExecError("Device creation failed, reverting...")
3156
3157     feedback_fn("adding instance %s to cluster config" % instance)
3158
3159     self.cfg.AddInstance(iobj)
3160
3161     if self.op.wait_for_sync:
3162       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3163     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3164       # make sure the disks are not degraded (still sync-ing is ok)
3165       time.sleep(15)
3166       feedback_fn("* checking mirrors status")
3167       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3168     else:
3169       disk_abort = False
3170
3171     if disk_abort:
3172       _RemoveDisks(iobj, self.cfg)
3173       self.cfg.RemoveInstance(iobj.name)
3174       raise errors.OpExecError("There are some degraded disks for"
3175                                " this instance")
3176
3177     feedback_fn("creating os for instance %s on node %s" %
3178                 (instance, pnode_name))
3179
3180     if iobj.disk_template != constants.DT_DISKLESS:
3181       if self.op.mode == constants.INSTANCE_CREATE:
3182         feedback_fn("* running the instance OS create scripts...")
3183         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3184           raise errors.OpExecError("could not add os for instance %s"
3185                                    " on node %s" %
3186                                    (instance, pnode_name))
3187
3188       elif self.op.mode == constants.INSTANCE_IMPORT:
3189         feedback_fn("* running the instance OS import scripts...")
3190         src_node = self.op.src_node
3191         src_image = self.src_image
3192         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3193                                                 src_node, src_image):
3194           raise errors.OpExecError("Could not import os for instance"
3195                                    " %s on node %s" %
3196                                    (instance, pnode_name))
3197       else:
3198         # also checked in the prereq part
3199         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3200                                      % self.op.mode)
3201
3202     if self.op.start:
3203       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3204       feedback_fn("* starting instance...")
3205       if not rpc.call_instance_start(pnode_name, iobj, None):
3206         raise errors.OpExecError("Could not start instance")
3207
3208
3209 class LUConnectConsole(NoHooksLU):
3210   """Connect to an instance's console.
3211
3212   This is somewhat special in that it returns the command line that
3213   you need to run on the master node in order to connect to the
3214   console.
3215
3216   """
3217   _OP_REQP = ["instance_name"]
3218
3219   def CheckPrereq(self):
3220     """Check prerequisites.
3221
3222     This checks that the instance is in the cluster.
3223
3224     """
3225     instance = self.cfg.GetInstanceInfo(
3226       self.cfg.ExpandInstanceName(self.op.instance_name))
3227     if instance is None:
3228       raise errors.OpPrereqError("Instance '%s' not known" %
3229                                  self.op.instance_name)
3230     self.instance = instance
3231
3232   def Exec(self, feedback_fn):
3233     """Connect to the console of an instance
3234
3235     """
3236     instance = self.instance
3237     node = instance.primary_node
3238
3239     node_insts = rpc.call_instance_list([node])[node]
3240     if node_insts is False:
3241       raise errors.OpExecError("Can't connect to node %s." % node)
3242
3243     if instance.name not in node_insts:
3244       raise errors.OpExecError("Instance %s is not running." % instance.name)
3245
3246     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3247
3248     hyper = hypervisor.GetHypervisor()
3249     console_cmd = hyper.GetShellCommandForConsole(instance)
3250
3251     # build ssh cmdline
3252     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3253
3254
3255 class LUReplaceDisks(LogicalUnit):
3256   """Replace the disks of an instance.
3257
3258   """
3259   HPATH = "mirrors-replace"
3260   HTYPE = constants.HTYPE_INSTANCE
3261   _OP_REQP = ["instance_name", "mode", "disks"]
3262
3263   def BuildHooksEnv(self):
3264     """Build hooks env.
3265
3266     This runs on the master, the primary and all the secondaries.
3267
3268     """
3269     env = {
3270       "MODE": self.op.mode,
3271       "NEW_SECONDARY": self.op.remote_node,
3272       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3273       }
3274     env.update(_BuildInstanceHookEnvByObject(self.instance))
3275     nl = [
3276       self.sstore.GetMasterNode(),
3277       self.instance.primary_node,
3278       ]
3279     if self.op.remote_node is not None:
3280       nl.append(self.op.remote_node)
3281     return env, nl, nl
3282
3283   def CheckPrereq(self):
3284     """Check prerequisites.
3285
3286     This checks that the instance is in the cluster.
3287
3288     """
3289     instance = self.cfg.GetInstanceInfo(
3290       self.cfg.ExpandInstanceName(self.op.instance_name))
3291     if instance is None:
3292       raise errors.OpPrereqError("Instance '%s' not known" %
3293                                  self.op.instance_name)
3294     self.instance = instance
3295     self.op.instance_name = instance.name
3296
3297     if instance.disk_template not in constants.DTS_NET_MIRROR:
3298       raise errors.OpPrereqError("Instance's disk layout is not"
3299                                  " network mirrored.")
3300
3301     if len(instance.secondary_nodes) != 1:
3302       raise errors.OpPrereqError("The instance has a strange layout,"
3303                                  " expected one secondary but found %d" %
3304                                  len(instance.secondary_nodes))
3305
3306     self.sec_node = instance.secondary_nodes[0]
3307
3308     remote_node = getattr(self.op, "remote_node", None)
3309     if remote_node is not None:
3310       remote_node = self.cfg.ExpandNodeName(remote_node)
3311       if remote_node is None:
3312         raise errors.OpPrereqError("Node '%s' not known" %
3313                                    self.op.remote_node)
3314       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3315     else:
3316       self.remote_node_info = None
3317     if remote_node == instance.primary_node:
3318       raise errors.OpPrereqError("The specified node is the primary node of"
3319                                  " the instance.")
3320     elif remote_node == self.sec_node:
3321       if self.op.mode == constants.REPLACE_DISK_SEC:
3322         # this is for DRBD8, where we can't execute the same mode of
3323         # replacement as for drbd7 (no different port allocated)
3324         raise errors.OpPrereqError("Same secondary given, cannot execute"
3325                                    " replacement")
3326       # the user gave the current secondary, switch to
3327       # 'no-replace-secondary' mode for drbd7
3328       remote_node = None
3329     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3330         self.op.mode != constants.REPLACE_DISK_ALL):
3331       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3332                                  " disks replacement, not individual ones")
3333     if instance.disk_template == constants.DT_DRBD8:
3334       if (self.op.mode == constants.REPLACE_DISK_ALL and
3335           remote_node is not None):
3336         # switch to replace secondary mode
3337         self.op.mode = constants.REPLACE_DISK_SEC
3338
3339       if self.op.mode == constants.REPLACE_DISK_ALL:
3340         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3341                                    " secondary disk replacement, not"
3342                                    " both at once")
3343       elif self.op.mode == constants.REPLACE_DISK_PRI:
3344         if remote_node is not None:
3345           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3346                                      " the secondary while doing a primary"
3347                                      " node disk replacement")
3348         self.tgt_node = instance.primary_node
3349         self.oth_node = instance.secondary_nodes[0]
3350       elif self.op.mode == constants.REPLACE_DISK_SEC:
3351         self.new_node = remote_node # this can be None, in which case
3352                                     # we don't change the secondary
3353         self.tgt_node = instance.secondary_nodes[0]
3354         self.oth_node = instance.primary_node
3355       else:
3356         raise errors.ProgrammerError("Unhandled disk replace mode")
3357
3358     for name in self.op.disks:
3359       if instance.FindDisk(name) is None:
3360         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3361                                    (name, instance.name))
3362     self.op.remote_node = remote_node
3363
3364   def _ExecRR1(self, feedback_fn):
3365     """Replace the disks of an instance.
3366
3367     """
3368     instance = self.instance
3369     iv_names = {}
3370     # start of work
3371     if self.op.remote_node is None:
3372       remote_node = self.sec_node
3373     else:
3374       remote_node = self.op.remote_node
3375     cfg = self.cfg
3376     for dev in instance.disks:
3377       size = dev.size
3378       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3379       names = _GenerateUniqueNames(cfg, lv_names)
3380       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3381                                        remote_node, size, names)
3382       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3383       logger.Info("adding new mirror component on secondary for %s" %
3384                   dev.iv_name)
3385       #HARDCODE
3386       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3387                                         new_drbd, False,
3388                                         _GetInstanceInfoText(instance)):
3389         raise errors.OpExecError("Failed to create new component on secondary"
3390                                  " node %s. Full abort, cleanup manually!" %
3391                                  remote_node)
3392
3393       logger.Info("adding new mirror component on primary")
3394       #HARDCODE
3395       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3396                                       instance, new_drbd,
3397                                       _GetInstanceInfoText(instance)):
3398         # remove secondary dev
3399         cfg.SetDiskID(new_drbd, remote_node)
3400         rpc.call_blockdev_remove(remote_node, new_drbd)
3401         raise errors.OpExecError("Failed to create volume on primary!"
3402                                  " Full abort, cleanup manually!!")
3403
3404       # the device exists now
3405       # call the primary node to add the mirror to md
3406       logger.Info("adding new mirror component to md")
3407       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3408                                            [new_drbd]):
3409         logger.Error("Can't add mirror compoment to md!")
3410         cfg.SetDiskID(new_drbd, remote_node)
3411         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3412           logger.Error("Can't rollback on secondary")
3413         cfg.SetDiskID(new_drbd, instance.primary_node)
3414         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3415           logger.Error("Can't rollback on primary")
3416         raise errors.OpExecError("Full abort, cleanup manually!!")
3417
3418       dev.children.append(new_drbd)
3419       cfg.AddInstance(instance)
3420
3421     # this can fail as the old devices are degraded and _WaitForSync
3422     # does a combined result over all disks, so we don't check its
3423     # return value
3424     _WaitForSync(cfg, instance, self.proc, unlock=True)
3425
3426     # so check manually all the devices
3427     for name in iv_names:
3428       dev, child, new_drbd = iv_names[name]
3429       cfg.SetDiskID(dev, instance.primary_node)
3430       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3431       if is_degr:
3432         raise errors.OpExecError("MD device %s is degraded!" % name)
3433       cfg.SetDiskID(new_drbd, instance.primary_node)
3434       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3435       if is_degr:
3436         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3437
3438     for name in iv_names:
3439       dev, child, new_drbd = iv_names[name]
3440       logger.Info("remove mirror %s component" % name)
3441       cfg.SetDiskID(dev, instance.primary_node)
3442       if not rpc.call_blockdev_removechildren(instance.primary_node,
3443                                               dev, [child]):
3444         logger.Error("Can't remove child from mirror, aborting"
3445                      " *this device cleanup*.\nYou need to cleanup manually!!")
3446         continue
3447
3448       for node in child.logical_id[:2]:
3449         logger.Info("remove child device on %s" % node)
3450         cfg.SetDiskID(child, node)
3451         if not rpc.call_blockdev_remove(node, child):
3452           logger.Error("Warning: failed to remove device from node %s,"
3453                        " continuing operation." % node)
3454
3455       dev.children.remove(child)
3456
3457       cfg.AddInstance(instance)
3458
3459   def _ExecD8DiskOnly(self, feedback_fn):
3460     """Replace a disk on the primary or secondary for dbrd8.
3461
3462     The algorithm for replace is quite complicated:
3463       - for each disk to be replaced:
3464         - create new LVs on the target node with unique names
3465         - detach old LVs from the drbd device
3466         - rename old LVs to name_replaced.<time_t>
3467         - rename new LVs to old LVs
3468         - attach the new LVs (with the old names now) to the drbd device
3469       - wait for sync across all devices
3470       - for each modified disk:
3471         - remove old LVs (which have the name name_replaces.<time_t>)
3472
3473     Failures are not very well handled.
3474
3475     """
3476     steps_total = 6
3477     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3478     instance = self.instance
3479     iv_names = {}
3480     vgname = self.cfg.GetVGName()
3481     # start of work
3482     cfg = self.cfg
3483     tgt_node = self.tgt_node
3484     oth_node = self.oth_node
3485
3486     # Step: check device activation
3487     self.proc.LogStep(1, steps_total, "check device existence")
3488     info("checking volume groups")
3489     my_vg = cfg.GetVGName()
3490     results = rpc.call_vg_list([oth_node, tgt_node])
3491     if not results:
3492       raise errors.OpExecError("Can't list volume groups on the nodes")
3493     for node in oth_node, tgt_node:
3494       res = results.get(node, False)
3495       if not res or my_vg not in res:
3496         raise errors.OpExecError("Volume group '%s' not found on %s" %
3497                                  (my_vg, node))
3498     for dev in instance.disks:
3499       if not dev.iv_name in self.op.disks:
3500         continue
3501       for node in tgt_node, oth_node:
3502         info("checking %s on %s" % (dev.iv_name, node))
3503         cfg.SetDiskID(dev, node)
3504         if not rpc.call_blockdev_find(node, dev):
3505           raise errors.OpExecError("Can't find device %s on node %s" %
3506                                    (dev.iv_name, node))
3507
3508     # Step: check other node consistency
3509     self.proc.LogStep(2, steps_total, "check peer consistency")
3510     for dev in instance.disks:
3511       if not dev.iv_name in self.op.disks:
3512         continue
3513       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3514       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3515                                    oth_node==instance.primary_node):
3516         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3517                                  " to replace disks on this node (%s)" %
3518                                  (oth_node, tgt_node))
3519
3520     # Step: create new storage
3521     self.proc.LogStep(3, steps_total, "allocate new storage")
3522     for dev in instance.disks:
3523       if not dev.iv_name in self.op.disks:
3524         continue
3525       size = dev.size
3526       cfg.SetDiskID(dev, tgt_node)
3527       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3528       names = _GenerateUniqueNames(cfg, lv_names)
3529       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3530                              logical_id=(vgname, names[0]))
3531       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3532                              logical_id=(vgname, names[1]))
3533       new_lvs = [lv_data, lv_meta]
3534       old_lvs = dev.children
3535       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3536       info("creating new local storage on %s for %s" %
3537            (tgt_node, dev.iv_name))
3538       # since we *always* want to create this LV, we use the
3539       # _Create...OnPrimary (which forces the creation), even if we
3540       # are talking about the secondary node
3541       for new_lv in new_lvs:
3542         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3543                                         _GetInstanceInfoText(instance)):
3544           raise errors.OpExecError("Failed to create new LV named '%s' on"
3545                                    " node '%s'" %
3546                                    (new_lv.logical_id[1], tgt_node))
3547
3548     # Step: for each lv, detach+rename*2+attach
3549     self.proc.LogStep(4, steps_total, "change drbd configuration")
3550     for dev, old_lvs, new_lvs in iv_names.itervalues():
3551       info("detaching %s drbd from local storage" % dev.iv_name)
3552       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3553         raise errors.OpExecError("Can't detach drbd from local storage on node"
3554                                  " %s for device %s" % (tgt_node, dev.iv_name))
3555       #dev.children = []
3556       #cfg.Update(instance)
3557
3558       # ok, we created the new LVs, so now we know we have the needed
3559       # storage; as such, we proceed on the target node to rename
3560       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3561       # using the assumption that logical_id == physical_id (which in
3562       # turn is the unique_id on that node)
3563
3564       # FIXME(iustin): use a better name for the replaced LVs
3565       temp_suffix = int(time.time())
3566       ren_fn = lambda d, suff: (d.physical_id[0],
3567                                 d.physical_id[1] + "_replaced-%s" % suff)
3568       # build the rename list based on what LVs exist on the node
3569       rlist = []
3570       for to_ren in old_lvs:
3571         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3572         if find_res is not None: # device exists
3573           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3574
3575       info("renaming the old LVs on the target node")
3576       if not rpc.call_blockdev_rename(tgt_node, rlist):
3577         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3578       # now we rename the new LVs to the old LVs
3579       info("renaming the new LVs on the target node")
3580       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3581       if not rpc.call_blockdev_rename(tgt_node, rlist):
3582         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3583
3584       for old, new in zip(old_lvs, new_lvs):
3585         new.logical_id = old.logical_id
3586         cfg.SetDiskID(new, tgt_node)
3587
3588       for disk in old_lvs:
3589         disk.logical_id = ren_fn(disk, temp_suffix)
3590         cfg.SetDiskID(disk, tgt_node)
3591
3592       # now that the new lvs have the old name, we can add them to the device
3593       info("adding new mirror component on %s" % tgt_node)
3594       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3595         for new_lv in new_lvs:
3596           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3597             warning("Can't rollback device %s", hint="manually cleanup unused"
3598                     " logical volumes")
3599         raise errors.OpExecError("Can't add local storage to drbd")
3600
3601       dev.children = new_lvs
3602       cfg.Update(instance)
3603
3604     # Step: wait for sync
3605
3606     # this can fail as the old devices are degraded and _WaitForSync
3607     # does a combined result over all disks, so we don't check its
3608     # return value
3609     self.proc.LogStep(5, steps_total, "sync devices")
3610     _WaitForSync(cfg, instance, self.proc, unlock=True)
3611
3612     # so check manually all the devices
3613     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3614       cfg.SetDiskID(dev, instance.primary_node)
3615       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3616       if is_degr:
3617         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3618
3619     # Step: remove old storage
3620     self.proc.LogStep(6, steps_total, "removing old storage")
3621     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3622       info("remove logical volumes for %s" % name)
3623       for lv in old_lvs:
3624         cfg.SetDiskID(lv, tgt_node)
3625         if not rpc.call_blockdev_remove(tgt_node, lv):
3626           warning("Can't remove old LV", hint="manually remove unused LVs")
3627           continue
3628
3629   def _ExecD8Secondary(self, feedback_fn):
3630     """Replace the secondary node for drbd8.
3631
3632     The algorithm for replace is quite complicated:
3633       - for all disks of the instance:
3634         - create new LVs on the new node with same names
3635         - shutdown the drbd device on the old secondary
3636         - disconnect the drbd network on the primary
3637         - create the drbd device on the new secondary
3638         - network attach the drbd on the primary, using an artifice:
3639           the drbd code for Attach() will connect to the network if it
3640           finds a device which is connected to the good local disks but
3641           not network enabled
3642       - wait for sync across all devices
3643       - remove all disks from the old secondary
3644
3645     Failures are not very well handled.
3646
3647     """
3648     steps_total = 6
3649     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3650     instance = self.instance
3651     iv_names = {}
3652     vgname = self.cfg.GetVGName()
3653     # start of work
3654     cfg = self.cfg
3655     old_node = self.tgt_node
3656     new_node = self.new_node
3657     pri_node = instance.primary_node
3658
3659     # Step: check device activation
3660     self.proc.LogStep(1, steps_total, "check device existence")
3661     info("checking volume groups")
3662     my_vg = cfg.GetVGName()
3663     results = rpc.call_vg_list([pri_node, new_node])
3664     if not results:
3665       raise errors.OpExecError("Can't list volume groups on the nodes")
3666     for node in pri_node, new_node:
3667       res = results.get(node, False)
3668       if not res or my_vg not in res:
3669         raise errors.OpExecError("Volume group '%s' not found on %s" %
3670                                  (my_vg, node))
3671     for dev in instance.disks:
3672       if not dev.iv_name in self.op.disks:
3673         continue
3674       info("checking %s on %s" % (dev.iv_name, pri_node))
3675       cfg.SetDiskID(dev, pri_node)
3676       if not rpc.call_blockdev_find(pri_node, dev):
3677         raise errors.OpExecError("Can't find device %s on node %s" %
3678                                  (dev.iv_name, pri_node))
3679
3680     # Step: check other node consistency
3681     self.proc.LogStep(2, steps_total, "check peer consistency")
3682     for dev in instance.disks:
3683       if not dev.iv_name in self.op.disks:
3684         continue
3685       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3686       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3687         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3688                                  " unsafe to replace the secondary" %
3689                                  pri_node)
3690
3691     # Step: create new storage
3692     self.proc.LogStep(3, steps_total, "allocate new storage")
3693     for dev in instance.disks:
3694       size = dev.size
3695       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3696       # since we *always* want to create this LV, we use the
3697       # _Create...OnPrimary (which forces the creation), even if we
3698       # are talking about the secondary node
3699       for new_lv in dev.children:
3700         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3701                                         _GetInstanceInfoText(instance)):
3702           raise errors.OpExecError("Failed to create new LV named '%s' on"
3703                                    " node '%s'" %
3704                                    (new_lv.logical_id[1], new_node))
3705
3706       iv_names[dev.iv_name] = (dev, dev.children)
3707
3708     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3709     for dev in instance.disks:
3710       size = dev.size
3711       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3712       # create new devices on new_node
3713       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3714                               logical_id=(pri_node, new_node,
3715                                           dev.logical_id[2]),
3716                               children=dev.children)
3717       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3718                                         new_drbd, False,
3719                                       _GetInstanceInfoText(instance)):
3720         raise errors.OpExecError("Failed to create new DRBD on"
3721                                  " node '%s'" % new_node)
3722
3723     for dev in instance.disks:
3724       # we have new devices, shutdown the drbd on the old secondary
3725       info("shutting down drbd for %s on old node" % dev.iv_name)
3726       cfg.SetDiskID(dev, old_node)
3727       if not rpc.call_blockdev_shutdown(old_node, dev):
3728         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3729                 hint="Please cleanup this device manually as soon as possible")
3730
3731     info("detaching primary drbds from the network (=> standalone)")
3732     done = 0
3733     for dev in instance.disks:
3734       cfg.SetDiskID(dev, pri_node)
3735       # set the physical (unique in bdev terms) id to None, meaning
3736       # detach from network
3737       dev.physical_id = (None,) * len(dev.physical_id)
3738       # and 'find' the device, which will 'fix' it to match the
3739       # standalone state
3740       if rpc.call_blockdev_find(pri_node, dev):
3741         done += 1
3742       else:
3743         warning("Failed to detach drbd %s from network, unusual case" %
3744                 dev.iv_name)
3745
3746     if not done:
3747       # no detaches succeeded (very unlikely)
3748       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3749
3750     # if we managed to detach at least one, we update all the disks of
3751     # the instance to point to the new secondary
3752     info("updating instance configuration")
3753     for dev in instance.disks:
3754       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3755       cfg.SetDiskID(dev, pri_node)
3756     cfg.Update(instance)
3757
3758     # and now perform the drbd attach
3759     info("attaching primary drbds to new secondary (standalone => connected)")
3760     failures = []
3761     for dev in instance.disks:
3762       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3763       # since the attach is smart, it's enough to 'find' the device,
3764       # it will automatically activate the network, if the physical_id
3765       # is correct
3766       cfg.SetDiskID(dev, pri_node)
3767       if not rpc.call_blockdev_find(pri_node, dev):
3768         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3769                 "please do a gnt-instance info to see the status of disks")
3770
3771     # this can fail as the old devices are degraded and _WaitForSync
3772     # does a combined result over all disks, so we don't check its
3773     # return value
3774     self.proc.LogStep(5, steps_total, "sync devices")
3775     _WaitForSync(cfg, instance, self.proc, unlock=True)
3776
3777     # so check manually all the devices
3778     for name, (dev, old_lvs) in iv_names.iteritems():
3779       cfg.SetDiskID(dev, pri_node)
3780       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3781       if is_degr:
3782         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3783
3784     self.proc.LogStep(6, steps_total, "removing old storage")
3785     for name, (dev, old_lvs) in iv_names.iteritems():
3786       info("remove logical volumes for %s" % name)
3787       for lv in old_lvs:
3788         cfg.SetDiskID(lv, old_node)
3789         if not rpc.call_blockdev_remove(old_node, lv):
3790           warning("Can't remove LV on old secondary",
3791                   hint="Cleanup stale volumes by hand")
3792
3793   def Exec(self, feedback_fn):
3794     """Execute disk replacement.
3795
3796     This dispatches the disk replacement to the appropriate handler.
3797
3798     """
3799     instance = self.instance
3800     if instance.disk_template == constants.DT_REMOTE_RAID1:
3801       fn = self._ExecRR1
3802     elif instance.disk_template == constants.DT_DRBD8:
3803       if self.op.remote_node is None:
3804         fn = self._ExecD8DiskOnly
3805       else:
3806         fn = self._ExecD8Secondary
3807     else:
3808       raise errors.ProgrammerError("Unhandled disk replacement case")
3809     return fn(feedback_fn)
3810
3811
3812 class LUQueryInstanceData(NoHooksLU):
3813   """Query runtime instance data.
3814
3815   """
3816   _OP_REQP = ["instances"]
3817
3818   def CheckPrereq(self):
3819     """Check prerequisites.
3820
3821     This only checks the optional instance list against the existing names.
3822
3823     """
3824     if not isinstance(self.op.instances, list):
3825       raise errors.OpPrereqError("Invalid argument type 'instances'")
3826     if self.op.instances:
3827       self.wanted_instances = []
3828       names = self.op.instances
3829       for name in names:
3830         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3831         if instance is None:
3832           raise errors.OpPrereqError("No such instance name '%s'" % name)
3833         self.wanted_instances.append(instance)
3834     else:
3835       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3836                                in self.cfg.GetInstanceList()]
3837     return
3838
3839
3840   def _ComputeDiskStatus(self, instance, snode, dev):
3841     """Compute block device status.
3842
3843     """
3844     self.cfg.SetDiskID(dev, instance.primary_node)
3845     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3846     if dev.dev_type in constants.LDS_DRBD:
3847       # we change the snode then (otherwise we use the one passed in)
3848       if dev.logical_id[0] == instance.primary_node:
3849         snode = dev.logical_id[1]
3850       else:
3851         snode = dev.logical_id[0]
3852
3853     if snode:
3854       self.cfg.SetDiskID(dev, snode)
3855       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3856     else:
3857       dev_sstatus = None
3858
3859     if dev.children:
3860       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3861                       for child in dev.children]
3862     else:
3863       dev_children = []
3864
3865     data = {
3866       "iv_name": dev.iv_name,
3867       "dev_type": dev.dev_type,
3868       "logical_id": dev.logical_id,
3869       "physical_id": dev.physical_id,
3870       "pstatus": dev_pstatus,
3871       "sstatus": dev_sstatus,
3872       "children": dev_children,
3873       }
3874
3875     return data
3876
3877   def Exec(self, feedback_fn):
3878     """Gather and return data"""
3879     result = {}
3880     for instance in self.wanted_instances:
3881       remote_info = rpc.call_instance_info(instance.primary_node,
3882                                                 instance.name)
3883       if remote_info and "state" in remote_info:
3884         remote_state = "up"
3885       else:
3886         remote_state = "down"
3887       if instance.status == "down":
3888         config_state = "down"
3889       else:
3890         config_state = "up"
3891
3892       disks = [self._ComputeDiskStatus(instance, None, device)
3893                for device in instance.disks]
3894
3895       idict = {
3896         "name": instance.name,
3897         "config_state": config_state,
3898         "run_state": remote_state,
3899         "pnode": instance.primary_node,
3900         "snodes": instance.secondary_nodes,
3901         "os": instance.os,
3902         "memory": instance.memory,
3903         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3904         "disks": disks,
3905         "network_port": instance.network_port,
3906         "vcpus": instance.vcpus,
3907         "kernel_path": instance.kernel_path,
3908         "initrd_path": instance.initrd_path,
3909         "hvm_boot_order": instance.hvm_boot_order,
3910         }
3911
3912       result[instance.name] = idict
3913
3914     return result
3915
3916
3917 class LUSetInstanceParams(LogicalUnit):
3918   """Modifies an instances's parameters.
3919
3920   """
3921   HPATH = "instance-modify"
3922   HTYPE = constants.HTYPE_INSTANCE
3923   _OP_REQP = ["instance_name"]
3924
3925   def BuildHooksEnv(self):
3926     """Build hooks env.
3927
3928     This runs on the master, primary and secondaries.
3929
3930     """
3931     args = dict()
3932     if self.mem:
3933       args['memory'] = self.mem
3934     if self.vcpus:
3935       args['vcpus'] = self.vcpus
3936     if self.do_ip or self.do_bridge or self.mac:
3937       if self.do_ip:
3938         ip = self.ip
3939       else:
3940         ip = self.instance.nics[0].ip
3941       if self.bridge:
3942         bridge = self.bridge
3943       else:
3944         bridge = self.instance.nics[0].bridge
3945       if self.mac:
3946         mac = self.mac
3947       else:
3948         mac = self.instance.nics[0].mac
3949       args['nics'] = [(ip, bridge, mac)]
3950     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3951     nl = [self.sstore.GetMasterNode(),
3952           self.instance.primary_node] + list(self.instance.secondary_nodes)
3953     return env, nl, nl
3954
3955   def CheckPrereq(self):
3956     """Check prerequisites.
3957
3958     This only checks the instance list against the existing names.
3959
3960     """
3961     self.mem = getattr(self.op, "mem", None)
3962     self.vcpus = getattr(self.op, "vcpus", None)
3963     self.ip = getattr(self.op, "ip", None)
3964     self.mac = getattr(self.op, "mac", None)
3965     self.bridge = getattr(self.op, "bridge", None)
3966     self.kernel_path = getattr(self.op, "kernel_path", None)
3967     self.initrd_path = getattr(self.op, "initrd_path", None)
3968     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
3969     all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
3970                   self.kernel_path, self.initrd_path, self.hvm_boot_order]
3971     if all_params.count(None) == len(all_params):
3972       raise errors.OpPrereqError("No changes submitted")
3973     if self.mem is not None:
3974       try:
3975         self.mem = int(self.mem)
3976       except ValueError, err:
3977         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3978     if self.vcpus is not None:
3979       try:
3980         self.vcpus = int(self.vcpus)
3981       except ValueError, err:
3982         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3983     if self.ip is not None:
3984       self.do_ip = True
3985       if self.ip.lower() == "none":
3986         self.ip = None
3987       else:
3988         if not utils.IsValidIP(self.ip):
3989           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3990     else:
3991       self.do_ip = False
3992     self.do_bridge = (self.bridge is not None)
3993     if self.mac is not None:
3994       if self.cfg.IsMacInUse(self.mac):
3995         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
3996                                    self.mac)
3997       if not utils.IsValidMac(self.mac):
3998         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
3999
4000     if self.kernel_path is not None:
4001       self.do_kernel_path = True
4002       if self.kernel_path == constants.VALUE_NONE:
4003         raise errors.OpPrereqError("Can't set instance to no kernel")
4004
4005       if self.kernel_path != constants.VALUE_DEFAULT:
4006         if not os.path.isabs(self.kernel_path):
4007           raise errors.OpPrereqError("The kernel path must be an absolute"
4008                                     " filename")
4009     else:
4010       self.do_kernel_path = False
4011
4012     if self.initrd_path is not None:
4013       self.do_initrd_path = True
4014       if self.initrd_path not in (constants.VALUE_NONE,
4015                                   constants.VALUE_DEFAULT):
4016         if not os.path.isabs(self.initrd_path):
4017           raise errors.OpPrereqError("The initrd path must be an absolute"
4018                                     " filename")
4019     else:
4020       self.do_initrd_path = False
4021
4022     # boot order verification
4023     if self.hvm_boot_order is not None:
4024       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4025         if len(self.hvm_boot_order.strip("acdn")) != 0:
4026           raise errors.OpPrereqError("invalid boot order specified,"
4027                                      " must be one or more of [acdn]"
4028                                      " or 'default'")
4029
4030     instance = self.cfg.GetInstanceInfo(
4031       self.cfg.ExpandInstanceName(self.op.instance_name))
4032     if instance is None:
4033       raise errors.OpPrereqError("No such instance name '%s'" %
4034                                  self.op.instance_name)
4035     self.op.instance_name = instance.name
4036     self.instance = instance
4037     return
4038
4039   def Exec(self, feedback_fn):
4040     """Modifies an instance.
4041
4042     All parameters take effect only at the next restart of the instance.
4043     """
4044     result = []
4045     instance = self.instance
4046     if self.mem:
4047       instance.memory = self.mem
4048       result.append(("mem", self.mem))
4049     if self.vcpus:
4050       instance.vcpus = self.vcpus
4051       result.append(("vcpus",  self.vcpus))
4052     if self.do_ip:
4053       instance.nics[0].ip = self.ip
4054       result.append(("ip", self.ip))
4055     if self.bridge:
4056       instance.nics[0].bridge = self.bridge
4057       result.append(("bridge", self.bridge))
4058     if self.mac:
4059       instance.nics[0].mac = self.mac
4060       result.append(("mac", self.mac))
4061     if self.do_kernel_path:
4062       instance.kernel_path = self.kernel_path
4063       result.append(("kernel_path", self.kernel_path))
4064     if self.do_initrd_path:
4065       instance.initrd_path = self.initrd_path
4066       result.append(("initrd_path", self.initrd_path))
4067     if self.hvm_boot_order:
4068       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4069         instance.hvm_boot_order = None
4070       else:
4071         instance.hvm_boot_order = self.hvm_boot_order
4072       result.append(("hvm_boot_order", self.hvm_boot_order))
4073
4074     self.cfg.AddInstance(instance)
4075
4076     return result
4077
4078
4079 class LUQueryExports(NoHooksLU):
4080   """Query the exports list
4081
4082   """
4083   _OP_REQP = []
4084
4085   def CheckPrereq(self):
4086     """Check that the nodelist contains only existing nodes.
4087
4088     """
4089     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4090
4091   def Exec(self, feedback_fn):
4092     """Compute the list of all the exported system images.
4093
4094     Returns:
4095       a dictionary with the structure node->(export-list)
4096       where export-list is a list of the instances exported on
4097       that node.
4098
4099     """
4100     return rpc.call_export_list(self.nodes)
4101
4102
4103 class LUExportInstance(LogicalUnit):
4104   """Export an instance to an image in the cluster.
4105
4106   """
4107   HPATH = "instance-export"
4108   HTYPE = constants.HTYPE_INSTANCE
4109   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4110
4111   def BuildHooksEnv(self):
4112     """Build hooks env.
4113
4114     This will run on the master, primary node and target node.
4115
4116     """
4117     env = {
4118       "EXPORT_NODE": self.op.target_node,
4119       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4120       }
4121     env.update(_BuildInstanceHookEnvByObject(self.instance))
4122     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4123           self.op.target_node]
4124     return env, nl, nl
4125
4126   def CheckPrereq(self):
4127     """Check prerequisites.
4128
4129     This checks that the instance name is a valid one.
4130
4131     """
4132     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4133     self.instance = self.cfg.GetInstanceInfo(instance_name)
4134     if self.instance is None:
4135       raise errors.OpPrereqError("Instance '%s' not found" %
4136                                  self.op.instance_name)
4137
4138     # node verification
4139     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4140     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4141
4142     if self.dst_node is None:
4143       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4144                                  self.op.target_node)
4145     self.op.target_node = self.dst_node.name
4146
4147   def Exec(self, feedback_fn):
4148     """Export an instance to an image in the cluster.
4149
4150     """
4151     instance = self.instance
4152     dst_node = self.dst_node
4153     src_node = instance.primary_node
4154     if self.op.shutdown:
4155       # shutdown the instance, but not the disks
4156       if not rpc.call_instance_shutdown(src_node, instance):
4157          raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4158                                  (instance.name, source_node))
4159
4160     vgname = self.cfg.GetVGName()
4161
4162     snap_disks = []
4163
4164     try:
4165       for disk in instance.disks:
4166         if disk.iv_name == "sda":
4167           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4168           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4169
4170           if not new_dev_name:
4171             logger.Error("could not snapshot block device %s on node %s" %
4172                          (disk.logical_id[1], src_node))
4173           else:
4174             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4175                                       logical_id=(vgname, new_dev_name),
4176                                       physical_id=(vgname, new_dev_name),
4177                                       iv_name=disk.iv_name)
4178             snap_disks.append(new_dev)
4179
4180     finally:
4181       if self.op.shutdown and instance.status == "up":
4182         if not rpc.call_instance_start(src_node, instance, None):
4183           _ShutdownInstanceDisks(instance, self.cfg)
4184           raise errors.OpExecError("Could not start instance")
4185
4186     # TODO: check for size
4187
4188     for dev in snap_disks:
4189       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4190                                            instance):
4191         logger.Error("could not export block device %s from node"
4192                      " %s to node %s" %
4193                      (dev.logical_id[1], src_node, dst_node.name))
4194       if not rpc.call_blockdev_remove(src_node, dev):
4195         logger.Error("could not remove snapshot block device %s from"
4196                      " node %s" % (dev.logical_id[1], src_node))
4197
4198     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4199       logger.Error("could not finalize export for instance %s on node %s" %
4200                    (instance.name, dst_node.name))
4201
4202     nodelist = self.cfg.GetNodeList()
4203     nodelist.remove(dst_node.name)
4204
4205     # on one-node clusters nodelist will be empty after the removal
4206     # if we proceed the backup would be removed because OpQueryExports
4207     # substitutes an empty list with the full cluster node list.
4208     if nodelist:
4209       op = opcodes.OpQueryExports(nodes=nodelist)
4210       exportlist = self.proc.ChainOpCode(op)
4211       for node in exportlist:
4212         if instance.name in exportlist[node]:
4213           if not rpc.call_export_remove(node, instance.name):
4214             logger.Error("could not remove older export for instance %s"
4215                          " on node %s" % (instance.name, node))
4216
4217
4218 class TagsLU(NoHooksLU):
4219   """Generic tags LU.
4220
4221   This is an abstract class which is the parent of all the other tags LUs.
4222
4223   """
4224   def CheckPrereq(self):
4225     """Check prerequisites.
4226
4227     """
4228     if self.op.kind == constants.TAG_CLUSTER:
4229       self.target = self.cfg.GetClusterInfo()
4230     elif self.op.kind == constants.TAG_NODE:
4231       name = self.cfg.ExpandNodeName(self.op.name)
4232       if name is None:
4233         raise errors.OpPrereqError("Invalid node name (%s)" %
4234                                    (self.op.name,))
4235       self.op.name = name
4236       self.target = self.cfg.GetNodeInfo(name)
4237     elif self.op.kind == constants.TAG_INSTANCE:
4238       name = self.cfg.ExpandInstanceName(self.op.name)
4239       if name is None:
4240         raise errors.OpPrereqError("Invalid instance name (%s)" %
4241                                    (self.op.name,))
4242       self.op.name = name
4243       self.target = self.cfg.GetInstanceInfo(name)
4244     else:
4245       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4246                                  str(self.op.kind))
4247
4248
4249 class LUGetTags(TagsLU):
4250   """Returns the tags of a given object.
4251
4252   """
4253   _OP_REQP = ["kind", "name"]
4254
4255   def Exec(self, feedback_fn):
4256     """Returns the tag list.
4257
4258     """
4259     return self.target.GetTags()
4260
4261
4262 class LUSearchTags(NoHooksLU):
4263   """Searches the tags for a given pattern.
4264
4265   """
4266   _OP_REQP = ["pattern"]
4267
4268   def CheckPrereq(self):
4269     """Check prerequisites.
4270
4271     This checks the pattern passed for validity by compiling it.
4272
4273     """
4274     try:
4275       self.re = re.compile(self.op.pattern)
4276     except re.error, err:
4277       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4278                                  (self.op.pattern, err))
4279
4280   def Exec(self, feedback_fn):
4281     """Returns the tag list.
4282
4283     """
4284     cfg = self.cfg
4285     tgts = [("/cluster", cfg.GetClusterInfo())]
4286     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4287     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4288     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4289     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4290     results = []
4291     for path, target in tgts:
4292       for tag in target.GetTags():
4293         if self.re.search(tag):
4294           results.append((path, tag))
4295     return results
4296
4297
4298 class LUAddTags(TagsLU):
4299   """Sets a tag on a given object.
4300
4301   """
4302   _OP_REQP = ["kind", "name", "tags"]
4303
4304   def CheckPrereq(self):
4305     """Check prerequisites.
4306
4307     This checks the type and length of the tag name and value.
4308
4309     """
4310     TagsLU.CheckPrereq(self)
4311     for tag in self.op.tags:
4312       objects.TaggableObject.ValidateTag(tag)
4313
4314   def Exec(self, feedback_fn):
4315     """Sets the tag.
4316
4317     """
4318     try:
4319       for tag in self.op.tags:
4320         self.target.AddTag(tag)
4321     except errors.TagError, err:
4322       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4323     try:
4324       self.cfg.Update(self.target)
4325     except errors.ConfigurationError:
4326       raise errors.OpRetryError("There has been a modification to the"
4327                                 " config file and the operation has been"
4328                                 " aborted. Please retry.")
4329
4330
4331 class LUDelTags(TagsLU):
4332   """Delete a list of tags from a given object.
4333
4334   """
4335   _OP_REQP = ["kind", "name", "tags"]
4336
4337   def CheckPrereq(self):
4338     """Check prerequisites.
4339
4340     This checks that we have the given tag.
4341
4342     """
4343     TagsLU.CheckPrereq(self)
4344     for tag in self.op.tags:
4345       objects.TaggableObject.ValidateTag(tag)
4346     del_tags = frozenset(self.op.tags)
4347     cur_tags = self.target.GetTags()
4348     if not del_tags <= cur_tags:
4349       diff_tags = del_tags - cur_tags
4350       diff_names = ["'%s'" % tag for tag in diff_tags]
4351       diff_names.sort()
4352       raise errors.OpPrereqError("Tag(s) %s not found" %
4353                                  (",".join(diff_names)))
4354
4355   def Exec(self, feedback_fn):
4356     """Remove the tag from the object.
4357
4358     """
4359     for tag in self.op.tags:
4360       self.target.RemoveTag(tag)
4361     try:
4362       self.cfg.Update(self.target)
4363     except errors.ConfigurationError:
4364       raise errors.OpRetryError("There has been a modification to the"
4365                                 " config file and the operation has been"
4366                                 " aborted. Please retry.")
4367
4368 class LUTestDelay(NoHooksLU):
4369   """Sleep for a specified amount of time.
4370
4371   This LU sleeps on the master and/or nodes for a specified amoutn of
4372   time.
4373
4374   """
4375   _OP_REQP = ["duration", "on_master", "on_nodes"]
4376
4377   def CheckPrereq(self):
4378     """Check prerequisites.
4379
4380     This checks that we have a good list of nodes and/or the duration
4381     is valid.
4382
4383     """
4384
4385     if self.op.on_nodes:
4386       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4387
4388   def Exec(self, feedback_fn):
4389     """Do the actual sleep.
4390
4391     """
4392     if self.op.on_master:
4393       if not utils.TestDelay(self.op.duration):
4394         raise errors.OpExecError("Error during master delay test")
4395     if self.op.on_nodes:
4396       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4397       if not result:
4398         raise errors.OpExecError("Complete failure from rpc call")
4399       for node, node_result in result.items():
4400         if not node_result:
4401           raise errors.OpExecError("Failure during rpc call to node %s,"
4402                                    " result: %s" % (node, node_result))