Allow instance MAC address to be set.
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45
46 class LogicalUnit(object):
47   """Logical Unit base class.
48
49   Subclasses must follow these rules:
50     - implement CheckPrereq which also fills in the opcode instance
51       with all the fields (even if as None)
52     - implement Exec
53     - implement BuildHooksEnv
54     - redefine HPATH and HTYPE
55     - optionally redefine their run requirements (REQ_CLUSTER,
56       REQ_MASTER); note that all commands require root permissions
57
58   """
59   HPATH = None
60   HTYPE = None
61   _OP_REQP = []
62   REQ_CLUSTER = True
63   REQ_MASTER = True
64
65   def __init__(self, processor, op, cfg, sstore):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = cfg
75     self.sstore = sstore
76     for attr_name in self._OP_REQP:
77       attr_val = getattr(op, attr_name, None)
78       if attr_val is None:
79         raise errors.OpPrereqError("Required parameter '%s' missing" %
80                                    attr_name)
81     if self.REQ_CLUSTER:
82       if not cfg.IsCluster():
83         raise errors.OpPrereqError("Cluster not initialized yet,"
84                                    " use 'gnt-cluster init' first.")
85       if self.REQ_MASTER:
86         master = sstore.GetMasterNode()
87         if master != utils.HostInfo().name:
88           raise errors.OpPrereqError("Commands must be run on the master"
89                                      " node %s" % master)
90
91   def CheckPrereq(self):
92     """Check prerequisites for this LU.
93
94     This method should check that the prerequisites for the execution
95     of this LU are fulfilled. It can do internode communication, but
96     it should be idempotent - no cluster or system changes are
97     allowed.
98
99     The method should raise errors.OpPrereqError in case something is
100     not fulfilled. Its return value is ignored.
101
102     This method should also update all the parameters of the opcode to
103     their canonical form; e.g. a short node name must be fully
104     expanded after this method has successfully completed (so that
105     hooks, logging, etc. work correctly).
106
107     """
108     raise NotImplementedError
109
110   def Exec(self, feedback_fn):
111     """Execute the LU.
112
113     This method should implement the actual work. It should raise
114     errors.OpExecError for failures that are somewhat dealt with in
115     code, or expected.
116
117     """
118     raise NotImplementedError
119
120   def BuildHooksEnv(self):
121     """Build hooks environment for this LU.
122
123     This method should return a three-node tuple consisting of: a dict
124     containing the environment that will be used for running the
125     specific hook for this LU, a list of node names on which the hook
126     should run before the execution, and a list of node names on which
127     the hook should run after the execution.
128
129     The keys of the dict must not have 'GANETI_' prefixed as this will
130     be handled in the hooks runner. Also note additional keys will be
131     added by the hooks runner. If the LU doesn't define any
132     environment, an empty dict (and not None) should be returned.
133
134     As for the node lists, the master should not be included in the
135     them, as it will be added by the hooks runner in case this LU
136     requires a cluster to run on (otherwise we don't have a node
137     list). No nodes should be returned as an empty list (and not
138     None).
139
140     Note that if the HPATH for a LU class is None, this function will
141     not be called.
142
143     """
144     raise NotImplementedError
145
146
147 class NoHooksLU(LogicalUnit):
148   """Simple LU which runs no hooks.
149
150   This LU is intended as a parent for other LogicalUnits which will
151   run no hooks, in order to reduce duplicate code.
152
153   """
154   HPATH = None
155   HTYPE = None
156
157   def BuildHooksEnv(self):
158     """Build hooks env.
159
160     This is a no-op, since we don't run hooks.
161
162     """
163     return {}, [], []
164
165
166 def _AddHostToEtcHosts(hostname):
167   """Wrapper around utils.SetEtcHostsEntry.
168
169   """
170   hi = utils.HostInfo(name=hostname)
171   utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
172
173
174 def _RemoveHostFromEtcHosts(hostname):
175   """Wrapper around utils.RemoveEtcHostsEntry.
176
177   """
178   hi = utils.HostInfo(name=hostname)
179   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
180   utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
181
182
183 def _GetWantedNodes(lu, nodes):
184   """Returns list of checked and expanded node names.
185
186   Args:
187     nodes: List of nodes (strings) or None for all
188
189   """
190   if not isinstance(nodes, list):
191     raise errors.OpPrereqError("Invalid argument type 'nodes'")
192
193   if nodes:
194     wanted = []
195
196     for name in nodes:
197       node = lu.cfg.ExpandNodeName(name)
198       if node is None:
199         raise errors.OpPrereqError("No such node name '%s'" % name)
200       wanted.append(node)
201
202   else:
203     wanted = lu.cfg.GetNodeList()
204   return utils.NiceSort(wanted)
205
206
207 def _GetWantedInstances(lu, instances):
208   """Returns list of checked and expanded instance names.
209
210   Args:
211     instances: List of instances (strings) or None for all
212
213   """
214   if not isinstance(instances, list):
215     raise errors.OpPrereqError("Invalid argument type 'instances'")
216
217   if instances:
218     wanted = []
219
220     for name in instances:
221       instance = lu.cfg.ExpandInstanceName(name)
222       if instance is None:
223         raise errors.OpPrereqError("No such instance name '%s'" % name)
224       wanted.append(instance)
225
226   else:
227     wanted = lu.cfg.GetInstanceList()
228   return utils.NiceSort(wanted)
229
230
231 def _CheckOutputFields(static, dynamic, selected):
232   """Checks whether all selected fields are valid.
233
234   Args:
235     static: Static fields
236     dynamic: Dynamic fields
237
238   """
239   static_fields = frozenset(static)
240   dynamic_fields = frozenset(dynamic)
241
242   all_fields = static_fields | dynamic_fields
243
244   if not all_fields.issuperset(selected):
245     raise errors.OpPrereqError("Unknown output fields selected: %s"
246                                % ",".join(frozenset(selected).
247                                           difference(all_fields)))
248
249
250 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
251                           memory, vcpus, nics):
252   """Builds instance related env variables for hooks from single variables.
253
254   Args:
255     secondary_nodes: List of secondary nodes as strings
256   """
257   env = {
258     "OP_TARGET": name,
259     "INSTANCE_NAME": name,
260     "INSTANCE_PRIMARY": primary_node,
261     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
262     "INSTANCE_OS_TYPE": os_type,
263     "INSTANCE_STATUS": status,
264     "INSTANCE_MEMORY": memory,
265     "INSTANCE_VCPUS": vcpus,
266   }
267
268   if nics:
269     nic_count = len(nics)
270     for idx, (ip, bridge) in enumerate(nics):
271       if ip is None:
272         ip = ""
273       env["INSTANCE_NIC%d_IP" % idx] = ip
274       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
275   else:
276     nic_count = 0
277
278   env["INSTANCE_NIC_COUNT"] = nic_count
279
280   return env
281
282
283 def _BuildInstanceHookEnvByObject(instance, override=None):
284   """Builds instance related env variables for hooks from an object.
285
286   Args:
287     instance: objects.Instance object of instance
288     override: dict of values to override
289   """
290   args = {
291     'name': instance.name,
292     'primary_node': instance.primary_node,
293     'secondary_nodes': instance.secondary_nodes,
294     'os_type': instance.os,
295     'status': instance.os,
296     'memory': instance.memory,
297     'vcpus': instance.vcpus,
298     'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
299   }
300   if override:
301     args.update(override)
302   return _BuildInstanceHookEnv(**args)
303
304
305 def _UpdateKnownHosts(fullnode, ip, pubkey):
306   """Ensure a node has a correct known_hosts entry.
307
308   Args:
309     fullnode - Fully qualified domain name of host. (str)
310     ip       - IPv4 address of host (str)
311     pubkey   - the public key of the cluster
312
313   """
314   if os.path.exists(constants.SSH_KNOWN_HOSTS_FILE):
315     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'r+')
316   else:
317     f = open(constants.SSH_KNOWN_HOSTS_FILE, 'w+')
318
319   inthere = False
320
321   save_lines = []
322   add_lines = []
323   removed = False
324
325   for rawline in f:
326     logger.Debug('read %s' % (repr(rawline),))
327
328     parts = rawline.rstrip('\r\n').split()
329
330     # Ignore unwanted lines
331     if len(parts) >= 3 and not rawline.lstrip()[0] == '#':
332       fields = parts[0].split(',')
333       key = parts[2]
334
335       haveall = True
336       havesome = False
337       for spec in [ ip, fullnode ]:
338         if spec not in fields:
339           haveall = False
340         if spec in fields:
341           havesome = True
342
343       logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
344       if haveall and key == pubkey:
345         inthere = True
346         save_lines.append(rawline)
347         logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
348         continue
349
350       if havesome and (not haveall or key != pubkey):
351         removed = True
352         logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
353         continue
354
355     save_lines.append(rawline)
356
357   if not inthere:
358     add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
359     logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
360
361   if removed:
362     save_lines = save_lines + add_lines
363
364     # Write a new file and replace old.
365     fd, tmpname = tempfile.mkstemp('.tmp', 'known_hosts.',
366                                    constants.DATA_DIR)
367     newfile = os.fdopen(fd, 'w')
368     try:
369       newfile.write(''.join(save_lines))
370     finally:
371       newfile.close()
372     logger.Debug("Wrote new known_hosts.")
373     os.rename(tmpname, constants.SSH_KNOWN_HOSTS_FILE)
374
375   elif add_lines:
376     # Simply appending a new line will do the trick.
377     f.seek(0, 2)
378     for add in add_lines:
379       f.write(add)
380
381   f.close()
382
383
384 def _HasValidVG(vglist, vgname):
385   """Checks if the volume group list is valid.
386
387   A non-None return value means there's an error, and the return value
388   is the error message.
389
390   """
391   vgsize = vglist.get(vgname, None)
392   if vgsize is None:
393     return "volume group '%s' missing" % vgname
394   elif vgsize < 20480:
395     return ("volume group '%s' too small (20480MiB required, %dMib found)" %
396             (vgname, vgsize))
397   return None
398
399
400 def _InitSSHSetup(node):
401   """Setup the SSH configuration for the cluster.
402
403
404   This generates a dsa keypair for root, adds the pub key to the
405   permitted hosts and adds the hostkey to its own known hosts.
406
407   Args:
408     node: the name of this host as a fqdn
409
410   """
411   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
412
413   for name in priv_key, pub_key:
414     if os.path.exists(name):
415       utils.CreateBackup(name)
416     utils.RemoveFile(name)
417
418   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
419                          "-f", priv_key,
420                          "-q", "-N", ""])
421   if result.failed:
422     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
423                              result.output)
424
425   f = open(pub_key, 'r')
426   try:
427     utils.AddAuthorizedKey(auth_keys, f.read(8192))
428   finally:
429     f.close()
430
431
432 def _InitGanetiServerSetup(ss):
433   """Setup the necessary configuration for the initial node daemon.
434
435   This creates the nodepass file containing the shared password for
436   the cluster and also generates the SSL certificate.
437
438   """
439   # Create pseudo random password
440   randpass = sha.new(os.urandom(64)).hexdigest()
441   # and write it into sstore
442   ss.SetKey(ss.SS_NODED_PASS, randpass)
443
444   result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
445                          "-days", str(365*5), "-nodes", "-x509",
446                          "-keyout", constants.SSL_CERT_FILE,
447                          "-out", constants.SSL_CERT_FILE, "-batch"])
448   if result.failed:
449     raise errors.OpExecError("could not generate server ssl cert, command"
450                              " %s had exitcode %s and error message %s" %
451                              (result.cmd, result.exit_code, result.output))
452
453   os.chmod(constants.SSL_CERT_FILE, 0400)
454
455   result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
456
457   if result.failed:
458     raise errors.OpExecError("Could not start the node daemon, command %s"
459                              " had exitcode %s and error %s" %
460                              (result.cmd, result.exit_code, result.output))
461
462
463 def _CheckInstanceBridgesExist(instance):
464   """Check that the brigdes needed by an instance exist.
465
466   """
467   # check bridges existance
468   brlist = [nic.bridge for nic in instance.nics]
469   if not rpc.call_bridges_exist(instance.primary_node, brlist):
470     raise errors.OpPrereqError("one or more target bridges %s does not"
471                                " exist on destination node '%s'" %
472                                (brlist, instance.primary_node))
473
474
475 class LUInitCluster(LogicalUnit):
476   """Initialise the cluster.
477
478   """
479   HPATH = "cluster-init"
480   HTYPE = constants.HTYPE_CLUSTER
481   _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
482               "def_bridge", "master_netdev"]
483   REQ_CLUSTER = False
484
485   def BuildHooksEnv(self):
486     """Build hooks env.
487
488     Notes: Since we don't require a cluster, we must manually add
489     ourselves in the post-run node list.
490
491     """
492     env = {"OP_TARGET": self.op.cluster_name}
493     return env, [], [self.hostname.name]
494
495   def CheckPrereq(self):
496     """Verify that the passed name is a valid one.
497
498     """
499     if config.ConfigWriter.IsCluster():
500       raise errors.OpPrereqError("Cluster is already initialised")
501
502     if self.op.hypervisor_type == constants.HT_XEN_HVM31:
503       if not os.path.exists(constants.VNC_PASSWORD_FILE):
504         raise errors.OpPrereqError("Please prepare the cluster VNC"
505                                    "password file %s" %
506                                    constants.VNC_PASSWORD_FILE)
507
508     self.hostname = hostname = utils.HostInfo()
509
510     if hostname.ip.startswith("127."):
511       raise errors.OpPrereqError("This host's IP resolves to the private"
512                                  " range (%s). Please fix DNS or /etc/hosts." %
513                                  (hostname.ip,))
514
515     self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
516
517     if not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, hostname.ip,
518                          constants.DEFAULT_NODED_PORT):
519       raise errors.OpPrereqError("Inconsistency: this host's name resolves"
520                                  " to %s,\nbut this ip address does not"
521                                  " belong to this host."
522                                  " Aborting." % hostname.ip)
523
524     secondary_ip = getattr(self.op, "secondary_ip", None)
525     if secondary_ip and not utils.IsValidIP(secondary_ip):
526       raise errors.OpPrereqError("Invalid secondary ip given")
527     if (secondary_ip and
528         secondary_ip != hostname.ip and
529         (not utils.TcpPing(constants.LOCALHOST_IP_ADDRESS, secondary_ip,
530                            constants.DEFAULT_NODED_PORT))):
531       raise errors.OpPrereqError("You gave %s as secondary IP,"
532                                  " but it does not belong to this host." %
533                                  secondary_ip)
534     self.secondary_ip = secondary_ip
535
536     # checks presence of the volume group given
537     vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
538
539     if vgstatus:
540       raise errors.OpPrereqError("Error: %s" % vgstatus)
541
542     if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
543                     self.op.mac_prefix):
544       raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
545                                  self.op.mac_prefix)
546
547     if self.op.hypervisor_type not in constants.HYPER_TYPES:
548       raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
549                                  self.op.hypervisor_type)
550
551     result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
552     if result.failed:
553       raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
554                                  (self.op.master_netdev,
555                                   result.output.strip()))
556
557     if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
558             os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
559       raise errors.OpPrereqError("Init.d script '%s' missing or not"
560                                  " executable." % constants.NODE_INITD_SCRIPT)
561
562   def Exec(self, feedback_fn):
563     """Initialize the cluster.
564
565     """
566     clustername = self.clustername
567     hostname = self.hostname
568
569     # set up the simple store
570     self.sstore = ss = ssconf.SimpleStore()
571     ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
572     ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
573     ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
574     ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
575     ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
576
577     # set up the inter-node password and certificate
578     _InitGanetiServerSetup(ss)
579
580     # start the master ip
581     rpc.call_node_start_master(hostname.name)
582
583     # set up ssh config and /etc/hosts
584     f = open(constants.SSH_HOST_RSA_PUB, 'r')
585     try:
586       sshline = f.read()
587     finally:
588       f.close()
589     sshkey = sshline.split(" ")[1]
590
591     _AddHostToEtcHosts(hostname.name)
592
593     _UpdateKnownHosts(hostname.name, hostname.ip, sshkey)
594
595     _InitSSHSetup(hostname.name)
596
597     # init of cluster config file
598     self.cfg = cfgw = config.ConfigWriter()
599     cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
600                     sshkey, self.op.mac_prefix,
601                     self.op.vg_name, self.op.def_bridge)
602
603
604 class LUDestroyCluster(NoHooksLU):
605   """Logical unit for destroying the cluster.
606
607   """
608   _OP_REQP = []
609
610   def CheckPrereq(self):
611     """Check prerequisites.
612
613     This checks whether the cluster is empty.
614
615     Any errors are signalled by raising errors.OpPrereqError.
616
617     """
618     master = self.sstore.GetMasterNode()
619
620     nodelist = self.cfg.GetNodeList()
621     if len(nodelist) != 1 or nodelist[0] != master:
622       raise errors.OpPrereqError("There are still %d node(s) in"
623                                  " this cluster." % (len(nodelist) - 1))
624     instancelist = self.cfg.GetInstanceList()
625     if instancelist:
626       raise errors.OpPrereqError("There are still %d instance(s) in"
627                                  " this cluster." % len(instancelist))
628
629   def Exec(self, feedback_fn):
630     """Destroys the cluster.
631
632     """
633     master = self.sstore.GetMasterNode()
634     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
635     utils.CreateBackup(priv_key)
636     utils.CreateBackup(pub_key)
637     rpc.call_node_leave_cluster(master)
638
639
640 class LUVerifyCluster(NoHooksLU):
641   """Verifies the cluster status.
642
643   """
644   _OP_REQP = []
645
646   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
647                   remote_version, feedback_fn):
648     """Run multiple tests against a node.
649
650     Test list:
651       - compares ganeti version
652       - checks vg existance and size > 20G
653       - checks config file checksum
654       - checks ssh to other nodes
655
656     Args:
657       node: name of the node to check
658       file_list: required list of files
659       local_cksum: dictionary of local files and their checksums
660
661     """
662     # compares ganeti version
663     local_version = constants.PROTOCOL_VERSION
664     if not remote_version:
665       feedback_fn(" - ERROR: connection to %s failed" % (node))
666       return True
667
668     if local_version != remote_version:
669       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
670                       (local_version, node, remote_version))
671       return True
672
673     # checks vg existance and size > 20G
674
675     bad = False
676     if not vglist:
677       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
678                       (node,))
679       bad = True
680     else:
681       vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
682       if vgstatus:
683         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
684         bad = True
685
686     # checks config file checksum
687     # checks ssh to any
688
689     if 'filelist' not in node_result:
690       bad = True
691       feedback_fn("  - ERROR: node hasn't returned file checksum data")
692     else:
693       remote_cksum = node_result['filelist']
694       for file_name in file_list:
695         if file_name not in remote_cksum:
696           bad = True
697           feedback_fn("  - ERROR: file '%s' missing" % file_name)
698         elif remote_cksum[file_name] != local_cksum[file_name]:
699           bad = True
700           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
701
702     if 'nodelist' not in node_result:
703       bad = True
704       feedback_fn("  - ERROR: node hasn't returned node connectivity data")
705     else:
706       if node_result['nodelist']:
707         bad = True
708         for node in node_result['nodelist']:
709           feedback_fn("  - ERROR: communication with node '%s': %s" %
710                           (node, node_result['nodelist'][node]))
711     hyp_result = node_result.get('hypervisor', None)
712     if hyp_result is not None:
713       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
714     return bad
715
716   def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
717     """Verify an instance.
718
719     This function checks to see if the required block devices are
720     available on the instance's node.
721
722     """
723     bad = False
724
725     instancelist = self.cfg.GetInstanceList()
726     if not instance in instancelist:
727       feedback_fn("  - ERROR: instance %s not in instance list %s" %
728                       (instance, instancelist))
729       bad = True
730
731     instanceconfig = self.cfg.GetInstanceInfo(instance)
732     node_current = instanceconfig.primary_node
733
734     node_vol_should = {}
735     instanceconfig.MapLVsByNode(node_vol_should)
736
737     for node in node_vol_should:
738       for volume in node_vol_should[node]:
739         if node not in node_vol_is or volume not in node_vol_is[node]:
740           feedback_fn("  - ERROR: volume %s missing on node %s" %
741                           (volume, node))
742           bad = True
743
744     if not instanceconfig.status == 'down':
745       if not instance in node_instance[node_current]:
746         feedback_fn("  - ERROR: instance %s not running on node %s" %
747                         (instance, node_current))
748         bad = True
749
750     for node in node_instance:
751       if (not node == node_current):
752         if instance in node_instance[node]:
753           feedback_fn("  - ERROR: instance %s should not run on node %s" %
754                           (instance, node))
755           bad = True
756
757     return bad
758
759   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
760     """Verify if there are any unknown volumes in the cluster.
761
762     The .os, .swap and backup volumes are ignored. All other volumes are
763     reported as unknown.
764
765     """
766     bad = False
767
768     for node in node_vol_is:
769       for volume in node_vol_is[node]:
770         if node not in node_vol_should or volume not in node_vol_should[node]:
771           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
772                       (volume, node))
773           bad = True
774     return bad
775
776   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
777     """Verify the list of running instances.
778
779     This checks what instances are running but unknown to the cluster.
780
781     """
782     bad = False
783     for node in node_instance:
784       for runninginstance in node_instance[node]:
785         if runninginstance not in instancelist:
786           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
787                           (runninginstance, node))
788           bad = True
789     return bad
790
791   def CheckPrereq(self):
792     """Check prerequisites.
793
794     This has no prerequisites.
795
796     """
797     pass
798
799   def Exec(self, feedback_fn):
800     """Verify integrity of cluster, performing various test on nodes.
801
802     """
803     bad = False
804     feedback_fn("* Verifying global settings")
805     for msg in self.cfg.VerifyConfig():
806       feedback_fn("  - ERROR: %s" % msg)
807
808     vg_name = self.cfg.GetVGName()
809     nodelist = utils.NiceSort(self.cfg.GetNodeList())
810     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
811     node_volume = {}
812     node_instance = {}
813
814     # FIXME: verify OS list
815     # do local checksums
816     file_names = list(self.sstore.GetFileList())
817     file_names.append(constants.SSL_CERT_FILE)
818     file_names.append(constants.CLUSTER_CONF_FILE)
819     local_checksums = utils.FingerprintFiles(file_names)
820
821     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
822     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
823     all_instanceinfo = rpc.call_instance_list(nodelist)
824     all_vglist = rpc.call_vg_list(nodelist)
825     node_verify_param = {
826       'filelist': file_names,
827       'nodelist': nodelist,
828       'hypervisor': None,
829       }
830     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
831     all_rversion = rpc.call_version(nodelist)
832
833     for node in nodelist:
834       feedback_fn("* Verifying node %s" % node)
835       result = self._VerifyNode(node, file_names, local_checksums,
836                                 all_vglist[node], all_nvinfo[node],
837                                 all_rversion[node], feedback_fn)
838       bad = bad or result
839
840       # node_volume
841       volumeinfo = all_volumeinfo[node]
842
843       if type(volumeinfo) != dict:
844         feedback_fn("  - ERROR: connection to %s failed" % (node,))
845         bad = True
846         continue
847
848       node_volume[node] = volumeinfo
849
850       # node_instance
851       nodeinstance = all_instanceinfo[node]
852       if type(nodeinstance) != list:
853         feedback_fn("  - ERROR: connection to %s failed" % (node,))
854         bad = True
855         continue
856
857       node_instance[node] = nodeinstance
858
859     node_vol_should = {}
860
861     for instance in instancelist:
862       feedback_fn("* Verifying instance %s" % instance)
863       result =  self._VerifyInstance(instance, node_volume, node_instance,
864                                      feedback_fn)
865       bad = bad or result
866
867       inst_config = self.cfg.GetInstanceInfo(instance)
868
869       inst_config.MapLVsByNode(node_vol_should)
870
871     feedback_fn("* Verifying orphan volumes")
872     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
873                                        feedback_fn)
874     bad = bad or result
875
876     feedback_fn("* Verifying remaining instances")
877     result = self._VerifyOrphanInstances(instancelist, node_instance,
878                                          feedback_fn)
879     bad = bad or result
880
881     return int(bad)
882
883
884 class LUVerifyDisks(NoHooksLU):
885   """Verifies the cluster disks status.
886
887   """
888   _OP_REQP = []
889
890   def CheckPrereq(self):
891     """Check prerequisites.
892
893     This has no prerequisites.
894
895     """
896     pass
897
898   def Exec(self, feedback_fn):
899     """Verify integrity of cluster disks.
900
901     """
902     result = res_nodes, res_instances = [], []
903
904     vg_name = self.cfg.GetVGName()
905     nodes = utils.NiceSort(self.cfg.GetNodeList())
906     instances = [self.cfg.GetInstanceInfo(name)
907                  for name in self.cfg.GetInstanceList()]
908
909     nv_dict = {}
910     for inst in instances:
911       inst_lvs = {}
912       if (inst.status != "up" or
913           inst.disk_template not in constants.DTS_NET_MIRROR):
914         continue
915       inst.MapLVsByNode(inst_lvs)
916       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
917       for node, vol_list in inst_lvs.iteritems():
918         for vol in vol_list:
919           nv_dict[(node, vol)] = inst
920
921     if not nv_dict:
922       return result
923
924     node_lvs = rpc.call_volume_list(nodes, vg_name)
925
926     to_act = set()
927     for node in nodes:
928       # node_volume
929       lvs = node_lvs[node]
930
931       if not isinstance(lvs, dict):
932         logger.Info("connection to node %s failed or invalid data returned" %
933                     (node,))
934         res_nodes.append(node)
935         continue
936
937       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
938         if not lv_online:
939           inst = nv_dict.get((node, lv_name), None)
940           if inst is not None and inst.name not in res_instances:
941             res_instances.append(inst.name)
942
943     return result
944
945
946 class LURenameCluster(LogicalUnit):
947   """Rename the cluster.
948
949   """
950   HPATH = "cluster-rename"
951   HTYPE = constants.HTYPE_CLUSTER
952   _OP_REQP = ["name"]
953
954   def BuildHooksEnv(self):
955     """Build hooks env.
956
957     """
958     env = {
959       "OP_TARGET": self.op.sstore.GetClusterName(),
960       "NEW_NAME": self.op.name,
961       }
962     mn = self.sstore.GetMasterNode()
963     return env, [mn], [mn]
964
965   def CheckPrereq(self):
966     """Verify that the passed name is a valid one.
967
968     """
969     hostname = utils.HostInfo(self.op.name)
970
971     new_name = hostname.name
972     self.ip = new_ip = hostname.ip
973     old_name = self.sstore.GetClusterName()
974     old_ip = self.sstore.GetMasterIP()
975     if new_name == old_name and new_ip == old_ip:
976       raise errors.OpPrereqError("Neither the name nor the IP address of the"
977                                  " cluster has changed")
978     if new_ip != old_ip:
979       result = utils.RunCmd(["fping", "-q", new_ip])
980       if not result.failed:
981         raise errors.OpPrereqError("The given cluster IP address (%s) is"
982                                    " reachable on the network. Aborting." %
983                                    new_ip)
984
985     self.op.name = new_name
986
987   def Exec(self, feedback_fn):
988     """Rename the cluster.
989
990     """
991     clustername = self.op.name
992     ip = self.ip
993     ss = self.sstore
994
995     # shutdown the master IP
996     master = ss.GetMasterNode()
997     if not rpc.call_node_stop_master(master):
998       raise errors.OpExecError("Could not disable the master role")
999
1000     try:
1001       # modify the sstore
1002       ss.SetKey(ss.SS_MASTER_IP, ip)
1003       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1004
1005       # Distribute updated ss config to all nodes
1006       myself = self.cfg.GetNodeInfo(master)
1007       dist_nodes = self.cfg.GetNodeList()
1008       if myself.name in dist_nodes:
1009         dist_nodes.remove(myself.name)
1010
1011       logger.Debug("Copying updated ssconf data to all nodes")
1012       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1013         fname = ss.KeyToFilename(keyname)
1014         result = rpc.call_upload_file(dist_nodes, fname)
1015         for to_node in dist_nodes:
1016           if not result[to_node]:
1017             logger.Error("copy of file %s to node %s failed" %
1018                          (fname, to_node))
1019     finally:
1020       if not rpc.call_node_start_master(master):
1021         logger.Error("Could not re-enable the master role on the master,"
1022                      " please restart manually.")
1023
1024
1025 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1026   """Sleep and poll for an instance's disk to sync.
1027
1028   """
1029   if not instance.disks:
1030     return True
1031
1032   if not oneshot:
1033     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1034
1035   node = instance.primary_node
1036
1037   for dev in instance.disks:
1038     cfgw.SetDiskID(dev, node)
1039
1040   retries = 0
1041   while True:
1042     max_time = 0
1043     done = True
1044     cumul_degraded = False
1045     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1046     if not rstats:
1047       proc.LogWarning("Can't get any data from node %s" % node)
1048       retries += 1
1049       if retries >= 10:
1050         raise errors.RemoteError("Can't contact node %s for mirror data,"
1051                                  " aborting." % node)
1052       time.sleep(6)
1053       continue
1054     retries = 0
1055     for i in range(len(rstats)):
1056       mstat = rstats[i]
1057       if mstat is None:
1058         proc.LogWarning("Can't compute data for node %s/%s" %
1059                         (node, instance.disks[i].iv_name))
1060         continue
1061       # we ignore the ldisk parameter
1062       perc_done, est_time, is_degraded, _ = mstat
1063       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1064       if perc_done is not None:
1065         done = False
1066         if est_time is not None:
1067           rem_time = "%d estimated seconds remaining" % est_time
1068           max_time = est_time
1069         else:
1070           rem_time = "no time estimate"
1071         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1072                      (instance.disks[i].iv_name, perc_done, rem_time))
1073     if done or oneshot:
1074       break
1075
1076     if unlock:
1077       utils.Unlock('cmd')
1078     try:
1079       time.sleep(min(60, max_time))
1080     finally:
1081       if unlock:
1082         utils.Lock('cmd')
1083
1084   if done:
1085     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1086   return not cumul_degraded
1087
1088
1089 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1090   """Check that mirrors are not degraded.
1091
1092   The ldisk parameter, if True, will change the test from the
1093   is_degraded attribute (which represents overall non-ok status for
1094   the device(s)) to the ldisk (representing the local storage status).
1095
1096   """
1097   cfgw.SetDiskID(dev, node)
1098   if ldisk:
1099     idx = 6
1100   else:
1101     idx = 5
1102
1103   result = True
1104   if on_primary or dev.AssembleOnSecondary():
1105     rstats = rpc.call_blockdev_find(node, dev)
1106     if not rstats:
1107       logger.ToStderr("Can't get any data from node %s" % node)
1108       result = False
1109     else:
1110       result = result and (not rstats[idx])
1111   if dev.children:
1112     for child in dev.children:
1113       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1114
1115   return result
1116
1117
1118 class LUDiagnoseOS(NoHooksLU):
1119   """Logical unit for OS diagnose/query.
1120
1121   """
1122   _OP_REQP = []
1123
1124   def CheckPrereq(self):
1125     """Check prerequisites.
1126
1127     This always succeeds, since this is a pure query LU.
1128
1129     """
1130     return
1131
1132   def Exec(self, feedback_fn):
1133     """Compute the list of OSes.
1134
1135     """
1136     node_list = self.cfg.GetNodeList()
1137     node_data = rpc.call_os_diagnose(node_list)
1138     if node_data == False:
1139       raise errors.OpExecError("Can't gather the list of OSes")
1140     return node_data
1141
1142
1143 class LURemoveNode(LogicalUnit):
1144   """Logical unit for removing a node.
1145
1146   """
1147   HPATH = "node-remove"
1148   HTYPE = constants.HTYPE_NODE
1149   _OP_REQP = ["node_name"]
1150
1151   def BuildHooksEnv(self):
1152     """Build hooks env.
1153
1154     This doesn't run on the target node in the pre phase as a failed
1155     node would not allows itself to run.
1156
1157     """
1158     env = {
1159       "OP_TARGET": self.op.node_name,
1160       "NODE_NAME": self.op.node_name,
1161       }
1162     all_nodes = self.cfg.GetNodeList()
1163     all_nodes.remove(self.op.node_name)
1164     return env, all_nodes, all_nodes
1165
1166   def CheckPrereq(self):
1167     """Check prerequisites.
1168
1169     This checks:
1170      - the node exists in the configuration
1171      - it does not have primary or secondary instances
1172      - it's not the master
1173
1174     Any errors are signalled by raising errors.OpPrereqError.
1175
1176     """
1177     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1178     if node is None:
1179       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1180
1181     instance_list = self.cfg.GetInstanceList()
1182
1183     masternode = self.sstore.GetMasterNode()
1184     if node.name == masternode:
1185       raise errors.OpPrereqError("Node is the master node,"
1186                                  " you need to failover first.")
1187
1188     for instance_name in instance_list:
1189       instance = self.cfg.GetInstanceInfo(instance_name)
1190       if node.name == instance.primary_node:
1191         raise errors.OpPrereqError("Instance %s still running on the node,"
1192                                    " please remove first." % instance_name)
1193       if node.name in instance.secondary_nodes:
1194         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1195                                    " please remove first." % instance_name)
1196     self.op.node_name = node.name
1197     self.node = node
1198
1199   def Exec(self, feedback_fn):
1200     """Removes the node from the cluster.
1201
1202     """
1203     node = self.node
1204     logger.Info("stopping the node daemon and removing configs from node %s" %
1205                 node.name)
1206
1207     rpc.call_node_leave_cluster(node.name)
1208
1209     ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1210
1211     logger.Info("Removing node %s from config" % node.name)
1212
1213     self.cfg.RemoveNode(node.name)
1214
1215     _RemoveHostFromEtcHosts(node.name)
1216
1217
1218 class LUQueryNodes(NoHooksLU):
1219   """Logical unit for querying nodes.
1220
1221   """
1222   _OP_REQP = ["output_fields", "names"]
1223
1224   def CheckPrereq(self):
1225     """Check prerequisites.
1226
1227     This checks that the fields required are valid output fields.
1228
1229     """
1230     self.dynamic_fields = frozenset(["dtotal", "dfree",
1231                                      "mtotal", "mnode", "mfree",
1232                                      "bootid"])
1233
1234     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1235                                "pinst_list", "sinst_list",
1236                                "pip", "sip"],
1237                        dynamic=self.dynamic_fields,
1238                        selected=self.op.output_fields)
1239
1240     self.wanted = _GetWantedNodes(self, self.op.names)
1241
1242   def Exec(self, feedback_fn):
1243     """Computes the list of nodes and their attributes.
1244
1245     """
1246     nodenames = self.wanted
1247     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1248
1249     # begin data gathering
1250
1251     if self.dynamic_fields.intersection(self.op.output_fields):
1252       live_data = {}
1253       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1254       for name in nodenames:
1255         nodeinfo = node_data.get(name, None)
1256         if nodeinfo:
1257           live_data[name] = {
1258             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1259             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1260             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1261             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1262             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1263             "bootid": nodeinfo['bootid'],
1264             }
1265         else:
1266           live_data[name] = {}
1267     else:
1268       live_data = dict.fromkeys(nodenames, {})
1269
1270     node_to_primary = dict([(name, set()) for name in nodenames])
1271     node_to_secondary = dict([(name, set()) for name in nodenames])
1272
1273     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1274                              "sinst_cnt", "sinst_list"))
1275     if inst_fields & frozenset(self.op.output_fields):
1276       instancelist = self.cfg.GetInstanceList()
1277
1278       for instance_name in instancelist:
1279         inst = self.cfg.GetInstanceInfo(instance_name)
1280         if inst.primary_node in node_to_primary:
1281           node_to_primary[inst.primary_node].add(inst.name)
1282         for secnode in inst.secondary_nodes:
1283           if secnode in node_to_secondary:
1284             node_to_secondary[secnode].add(inst.name)
1285
1286     # end data gathering
1287
1288     output = []
1289     for node in nodelist:
1290       node_output = []
1291       for field in self.op.output_fields:
1292         if field == "name":
1293           val = node.name
1294         elif field == "pinst_list":
1295           val = list(node_to_primary[node.name])
1296         elif field == "sinst_list":
1297           val = list(node_to_secondary[node.name])
1298         elif field == "pinst_cnt":
1299           val = len(node_to_primary[node.name])
1300         elif field == "sinst_cnt":
1301           val = len(node_to_secondary[node.name])
1302         elif field == "pip":
1303           val = node.primary_ip
1304         elif field == "sip":
1305           val = node.secondary_ip
1306         elif field in self.dynamic_fields:
1307           val = live_data[node.name].get(field, None)
1308         else:
1309           raise errors.ParameterError(field)
1310         node_output.append(val)
1311       output.append(node_output)
1312
1313     return output
1314
1315
1316 class LUQueryNodeVolumes(NoHooksLU):
1317   """Logical unit for getting volumes on node(s).
1318
1319   """
1320   _OP_REQP = ["nodes", "output_fields"]
1321
1322   def CheckPrereq(self):
1323     """Check prerequisites.
1324
1325     This checks that the fields required are valid output fields.
1326
1327     """
1328     self.nodes = _GetWantedNodes(self, self.op.nodes)
1329
1330     _CheckOutputFields(static=["node"],
1331                        dynamic=["phys", "vg", "name", "size", "instance"],
1332                        selected=self.op.output_fields)
1333
1334
1335   def Exec(self, feedback_fn):
1336     """Computes the list of nodes and their attributes.
1337
1338     """
1339     nodenames = self.nodes
1340     volumes = rpc.call_node_volumes(nodenames)
1341
1342     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1343              in self.cfg.GetInstanceList()]
1344
1345     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1346
1347     output = []
1348     for node in nodenames:
1349       if node not in volumes or not volumes[node]:
1350         continue
1351
1352       node_vols = volumes[node][:]
1353       node_vols.sort(key=lambda vol: vol['dev'])
1354
1355       for vol in node_vols:
1356         node_output = []
1357         for field in self.op.output_fields:
1358           if field == "node":
1359             val = node
1360           elif field == "phys":
1361             val = vol['dev']
1362           elif field == "vg":
1363             val = vol['vg']
1364           elif field == "name":
1365             val = vol['name']
1366           elif field == "size":
1367             val = int(float(vol['size']))
1368           elif field == "instance":
1369             for inst in ilist:
1370               if node not in lv_by_node[inst]:
1371                 continue
1372               if vol['name'] in lv_by_node[inst][node]:
1373                 val = inst.name
1374                 break
1375             else:
1376               val = '-'
1377           else:
1378             raise errors.ParameterError(field)
1379           node_output.append(str(val))
1380
1381         output.append(node_output)
1382
1383     return output
1384
1385
1386 class LUAddNode(LogicalUnit):
1387   """Logical unit for adding node to the cluster.
1388
1389   """
1390   HPATH = "node-add"
1391   HTYPE = constants.HTYPE_NODE
1392   _OP_REQP = ["node_name"]
1393
1394   def BuildHooksEnv(self):
1395     """Build hooks env.
1396
1397     This will run on all nodes before, and on all nodes + the new node after.
1398
1399     """
1400     env = {
1401       "OP_TARGET": self.op.node_name,
1402       "NODE_NAME": self.op.node_name,
1403       "NODE_PIP": self.op.primary_ip,
1404       "NODE_SIP": self.op.secondary_ip,
1405       }
1406     nodes_0 = self.cfg.GetNodeList()
1407     nodes_1 = nodes_0 + [self.op.node_name, ]
1408     return env, nodes_0, nodes_1
1409
1410   def CheckPrereq(self):
1411     """Check prerequisites.
1412
1413     This checks:
1414      - the new node is not already in the config
1415      - it is resolvable
1416      - its parameters (single/dual homed) matches the cluster
1417
1418     Any errors are signalled by raising errors.OpPrereqError.
1419
1420     """
1421     node_name = self.op.node_name
1422     cfg = self.cfg
1423
1424     dns_data = utils.HostInfo(node_name)
1425
1426     node = dns_data.name
1427     primary_ip = self.op.primary_ip = dns_data.ip
1428     secondary_ip = getattr(self.op, "secondary_ip", None)
1429     if secondary_ip is None:
1430       secondary_ip = primary_ip
1431     if not utils.IsValidIP(secondary_ip):
1432       raise errors.OpPrereqError("Invalid secondary IP given")
1433     self.op.secondary_ip = secondary_ip
1434     node_list = cfg.GetNodeList()
1435     if node in node_list:
1436       raise errors.OpPrereqError("Node %s is already in the configuration"
1437                                  % node)
1438
1439     for existing_node_name in node_list:
1440       existing_node = cfg.GetNodeInfo(existing_node_name)
1441       if (existing_node.primary_ip == primary_ip or
1442           existing_node.secondary_ip == primary_ip or
1443           existing_node.primary_ip == secondary_ip or
1444           existing_node.secondary_ip == secondary_ip):
1445         raise errors.OpPrereqError("New node ip address(es) conflict with"
1446                                    " existing node %s" % existing_node.name)
1447
1448     # check that the type of the node (single versus dual homed) is the
1449     # same as for the master
1450     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1451     master_singlehomed = myself.secondary_ip == myself.primary_ip
1452     newbie_singlehomed = secondary_ip == primary_ip
1453     if master_singlehomed != newbie_singlehomed:
1454       if master_singlehomed:
1455         raise errors.OpPrereqError("The master has no private ip but the"
1456                                    " new node has one")
1457       else:
1458         raise errors.OpPrereqError("The master has a private ip but the"
1459                                    " new node doesn't have one")
1460
1461     # checks reachablity
1462     if not utils.TcpPing(utils.HostInfo().name,
1463                          primary_ip,
1464                          constants.DEFAULT_NODED_PORT):
1465       raise errors.OpPrereqError("Node not reachable by ping")
1466
1467     if not newbie_singlehomed:
1468       # check reachability from my secondary ip to newbie's secondary ip
1469       if not utils.TcpPing(myself.secondary_ip,
1470                            secondary_ip,
1471                            constants.DEFAULT_NODED_PORT):
1472         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1473                                    " based ping to noded port")
1474
1475     self.new_node = objects.Node(name=node,
1476                                  primary_ip=primary_ip,
1477                                  secondary_ip=secondary_ip)
1478
1479     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1480       if not os.path.exists(constants.VNC_PASSWORD_FILE):
1481         raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1482                                    constants.VNC_PASSWORD_FILE)
1483
1484   def Exec(self, feedback_fn):
1485     """Adds the new node to the cluster.
1486
1487     """
1488     new_node = self.new_node
1489     node = new_node.name
1490
1491     # set up inter-node password and certificate and restarts the node daemon
1492     gntpass = self.sstore.GetNodeDaemonPassword()
1493     if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1494       raise errors.OpExecError("ganeti password corruption detected")
1495     f = open(constants.SSL_CERT_FILE)
1496     try:
1497       gntpem = f.read(8192)
1498     finally:
1499       f.close()
1500     # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1501     # so we use this to detect an invalid certificate; as long as the
1502     # cert doesn't contain this, the here-document will be correctly
1503     # parsed by the shell sequence below
1504     if re.search('^!EOF\.', gntpem, re.MULTILINE):
1505       raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1506     if not gntpem.endswith("\n"):
1507       raise errors.OpExecError("PEM must end with newline")
1508     logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1509
1510     # and then connect with ssh to set password and start ganeti-noded
1511     # note that all the below variables are sanitized at this point,
1512     # either by being constants or by the checks above
1513     ss = self.sstore
1514     mycommand = ("umask 077 && "
1515                  "echo '%s' > '%s' && "
1516                  "cat > '%s' << '!EOF.' && \n"
1517                  "%s!EOF.\n%s restart" %
1518                  (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1519                   constants.SSL_CERT_FILE, gntpem,
1520                   constants.NODE_INITD_SCRIPT))
1521
1522     result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1523     if result.failed:
1524       raise errors.OpExecError("Remote command on node %s, error: %s,"
1525                                " output: %s" %
1526                                (node, result.fail_reason, result.output))
1527
1528     # check connectivity
1529     time.sleep(4)
1530
1531     result = rpc.call_version([node])[node]
1532     if result:
1533       if constants.PROTOCOL_VERSION == result:
1534         logger.Info("communication to node %s fine, sw version %s match" %
1535                     (node, result))
1536       else:
1537         raise errors.OpExecError("Version mismatch master version %s,"
1538                                  " node version %s" %
1539                                  (constants.PROTOCOL_VERSION, result))
1540     else:
1541       raise errors.OpExecError("Cannot get version from the new node")
1542
1543     # setup ssh on node
1544     logger.Info("copy ssh key to node %s" % node)
1545     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1546     keyarray = []
1547     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1548                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1549                 priv_key, pub_key]
1550
1551     for i in keyfiles:
1552       f = open(i, 'r')
1553       try:
1554         keyarray.append(f.read())
1555       finally:
1556         f.close()
1557
1558     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1559                                keyarray[3], keyarray[4], keyarray[5])
1560
1561     if not result:
1562       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1563
1564     # Add node to our /etc/hosts, and add key to known_hosts
1565     _AddHostToEtcHosts(new_node.name)
1566
1567     _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1568                       self.cfg.GetHostKey())
1569
1570     if new_node.secondary_ip != new_node.primary_ip:
1571       if not rpc.call_node_tcp_ping(new_node.name,
1572                                     constants.LOCALHOST_IP_ADDRESS,
1573                                     new_node.secondary_ip,
1574                                     constants.DEFAULT_NODED_PORT,
1575                                     10, False):
1576         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1577                                  " you gave (%s). Please fix and re-run this"
1578                                  " command." % new_node.secondary_ip)
1579
1580     success, msg = ssh.VerifyNodeHostname(node)
1581     if not success:
1582       raise errors.OpExecError("Node '%s' claims it has a different hostname"
1583                                " than the one the resolver gives: %s."
1584                                " Please fix and re-run this command." %
1585                                (node, msg))
1586
1587     # Distribute updated /etc/hosts and known_hosts to all nodes,
1588     # including the node just added
1589     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1590     dist_nodes = self.cfg.GetNodeList() + [node]
1591     if myself.name in dist_nodes:
1592       dist_nodes.remove(myself.name)
1593
1594     logger.Debug("Copying hosts and known_hosts to all nodes")
1595     for fname in ("/etc/hosts", constants.SSH_KNOWN_HOSTS_FILE):
1596       result = rpc.call_upload_file(dist_nodes, fname)
1597       for to_node in dist_nodes:
1598         if not result[to_node]:
1599           logger.Error("copy of file %s to node %s failed" %
1600                        (fname, to_node))
1601
1602     to_copy = ss.GetFileList()
1603     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1604       to_copy.append(constants.VNC_PASSWORD_FILE)
1605     for fname in to_copy:
1606       if not ssh.CopyFileToNode(node, fname):
1607         logger.Error("could not copy file %s to node %s" % (fname, node))
1608
1609     logger.Info("adding node %s to cluster.conf" % node)
1610     self.cfg.AddNode(new_node)
1611
1612
1613 class LUMasterFailover(LogicalUnit):
1614   """Failover the master node to the current node.
1615
1616   This is a special LU in that it must run on a non-master node.
1617
1618   """
1619   HPATH = "master-failover"
1620   HTYPE = constants.HTYPE_CLUSTER
1621   REQ_MASTER = False
1622   _OP_REQP = []
1623
1624   def BuildHooksEnv(self):
1625     """Build hooks env.
1626
1627     This will run on the new master only in the pre phase, and on all
1628     the nodes in the post phase.
1629
1630     """
1631     env = {
1632       "OP_TARGET": self.new_master,
1633       "NEW_MASTER": self.new_master,
1634       "OLD_MASTER": self.old_master,
1635       }
1636     return env, [self.new_master], self.cfg.GetNodeList()
1637
1638   def CheckPrereq(self):
1639     """Check prerequisites.
1640
1641     This checks that we are not already the master.
1642
1643     """
1644     self.new_master = utils.HostInfo().name
1645     self.old_master = self.sstore.GetMasterNode()
1646
1647     if self.old_master == self.new_master:
1648       raise errors.OpPrereqError("This commands must be run on the node"
1649                                  " where you want the new master to be."
1650                                  " %s is already the master" %
1651                                  self.old_master)
1652
1653   def Exec(self, feedback_fn):
1654     """Failover the master node.
1655
1656     This command, when run on a non-master node, will cause the current
1657     master to cease being master, and the non-master to become new
1658     master.
1659
1660     """
1661     #TODO: do not rely on gethostname returning the FQDN
1662     logger.Info("setting master to %s, old master: %s" %
1663                 (self.new_master, self.old_master))
1664
1665     if not rpc.call_node_stop_master(self.old_master):
1666       logger.Error("could disable the master role on the old master"
1667                    " %s, please disable manually" % self.old_master)
1668
1669     ss = self.sstore
1670     ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1671     if not rpc.call_upload_file(self.cfg.GetNodeList(),
1672                                 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1673       logger.Error("could not distribute the new simple store master file"
1674                    " to the other nodes, please check.")
1675
1676     if not rpc.call_node_start_master(self.new_master):
1677       logger.Error("could not start the master role on the new master"
1678                    " %s, please check" % self.new_master)
1679       feedback_fn("Error in activating the master IP on the new master,"
1680                   " please fix manually.")
1681
1682
1683
1684 class LUQueryClusterInfo(NoHooksLU):
1685   """Query cluster configuration.
1686
1687   """
1688   _OP_REQP = []
1689   REQ_MASTER = False
1690
1691   def CheckPrereq(self):
1692     """No prerequsites needed for this LU.
1693
1694     """
1695     pass
1696
1697   def Exec(self, feedback_fn):
1698     """Return cluster config.
1699
1700     """
1701     result = {
1702       "name": self.sstore.GetClusterName(),
1703       "software_version": constants.RELEASE_VERSION,
1704       "protocol_version": constants.PROTOCOL_VERSION,
1705       "config_version": constants.CONFIG_VERSION,
1706       "os_api_version": constants.OS_API_VERSION,
1707       "export_version": constants.EXPORT_VERSION,
1708       "master": self.sstore.GetMasterNode(),
1709       "architecture": (platform.architecture()[0], platform.machine()),
1710       }
1711
1712     return result
1713
1714
1715 class LUClusterCopyFile(NoHooksLU):
1716   """Copy file to cluster.
1717
1718   """
1719   _OP_REQP = ["nodes", "filename"]
1720
1721   def CheckPrereq(self):
1722     """Check prerequisites.
1723
1724     It should check that the named file exists and that the given list
1725     of nodes is valid.
1726
1727     """
1728     if not os.path.exists(self.op.filename):
1729       raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1730
1731     self.nodes = _GetWantedNodes(self, self.op.nodes)
1732
1733   def Exec(self, feedback_fn):
1734     """Copy a file from master to some nodes.
1735
1736     Args:
1737       opts - class with options as members
1738       args - list containing a single element, the file name
1739     Opts used:
1740       nodes - list containing the name of target nodes; if empty, all nodes
1741
1742     """
1743     filename = self.op.filename
1744
1745     myname = utils.HostInfo().name
1746
1747     for node in self.nodes:
1748       if node == myname:
1749         continue
1750       if not ssh.CopyFileToNode(node, filename):
1751         logger.Error("Copy of file %s to node %s failed" % (filename, node))
1752
1753
1754 class LUDumpClusterConfig(NoHooksLU):
1755   """Return a text-representation of the cluster-config.
1756
1757   """
1758   _OP_REQP = []
1759
1760   def CheckPrereq(self):
1761     """No prerequisites.
1762
1763     """
1764     pass
1765
1766   def Exec(self, feedback_fn):
1767     """Dump a representation of the cluster config to the standard output.
1768
1769     """
1770     return self.cfg.DumpConfig()
1771
1772
1773 class LURunClusterCommand(NoHooksLU):
1774   """Run a command on some nodes.
1775
1776   """
1777   _OP_REQP = ["command", "nodes"]
1778
1779   def CheckPrereq(self):
1780     """Check prerequisites.
1781
1782     It checks that the given list of nodes is valid.
1783
1784     """
1785     self.nodes = _GetWantedNodes(self, self.op.nodes)
1786
1787   def Exec(self, feedback_fn):
1788     """Run a command on some nodes.
1789
1790     """
1791     data = []
1792     for node in self.nodes:
1793       result = ssh.SSHCall(node, "root", self.op.command)
1794       data.append((node, result.output, result.exit_code))
1795
1796     return data
1797
1798
1799 class LUActivateInstanceDisks(NoHooksLU):
1800   """Bring up an instance's disks.
1801
1802   """
1803   _OP_REQP = ["instance_name"]
1804
1805   def CheckPrereq(self):
1806     """Check prerequisites.
1807
1808     This checks that the instance is in the cluster.
1809
1810     """
1811     instance = self.cfg.GetInstanceInfo(
1812       self.cfg.ExpandInstanceName(self.op.instance_name))
1813     if instance is None:
1814       raise errors.OpPrereqError("Instance '%s' not known" %
1815                                  self.op.instance_name)
1816     self.instance = instance
1817
1818
1819   def Exec(self, feedback_fn):
1820     """Activate the disks.
1821
1822     """
1823     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1824     if not disks_ok:
1825       raise errors.OpExecError("Cannot activate block devices")
1826
1827     return disks_info
1828
1829
1830 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1831   """Prepare the block devices for an instance.
1832
1833   This sets up the block devices on all nodes.
1834
1835   Args:
1836     instance: a ganeti.objects.Instance object
1837     ignore_secondaries: if true, errors on secondary nodes won't result
1838                         in an error return from the function
1839
1840   Returns:
1841     false if the operation failed
1842     list of (host, instance_visible_name, node_visible_name) if the operation
1843          suceeded with the mapping from node devices to instance devices
1844   """
1845   device_info = []
1846   disks_ok = True
1847   for inst_disk in instance.disks:
1848     master_result = None
1849     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1850       cfg.SetDiskID(node_disk, node)
1851       is_primary = node == instance.primary_node
1852       result = rpc.call_blockdev_assemble(node, node_disk,
1853                                           instance.name, is_primary)
1854       if not result:
1855         logger.Error("could not prepare block device %s on node %s"
1856                      " (is_primary=%s)" %
1857                      (inst_disk.iv_name, node, is_primary))
1858         if is_primary or not ignore_secondaries:
1859           disks_ok = False
1860       if is_primary:
1861         master_result = result
1862     device_info.append((instance.primary_node, inst_disk.iv_name,
1863                         master_result))
1864
1865   # leave the disks configured for the primary node
1866   # this is a workaround that would be fixed better by
1867   # improving the logical/physical id handling
1868   for disk in instance.disks:
1869     cfg.SetDiskID(disk, instance.primary_node)
1870
1871   return disks_ok, device_info
1872
1873
1874 def _StartInstanceDisks(cfg, instance, force):
1875   """Start the disks of an instance.
1876
1877   """
1878   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1879                                            ignore_secondaries=force)
1880   if not disks_ok:
1881     _ShutdownInstanceDisks(instance, cfg)
1882     if force is not None and not force:
1883       logger.Error("If the message above refers to a secondary node,"
1884                    " you can retry the operation using '--force'.")
1885     raise errors.OpExecError("Disk consistency error")
1886
1887
1888 class LUDeactivateInstanceDisks(NoHooksLU):
1889   """Shutdown an instance's disks.
1890
1891   """
1892   _OP_REQP = ["instance_name"]
1893
1894   def CheckPrereq(self):
1895     """Check prerequisites.
1896
1897     This checks that the instance is in the cluster.
1898
1899     """
1900     instance = self.cfg.GetInstanceInfo(
1901       self.cfg.ExpandInstanceName(self.op.instance_name))
1902     if instance is None:
1903       raise errors.OpPrereqError("Instance '%s' not known" %
1904                                  self.op.instance_name)
1905     self.instance = instance
1906
1907   def Exec(self, feedback_fn):
1908     """Deactivate the disks
1909
1910     """
1911     instance = self.instance
1912     ins_l = rpc.call_instance_list([instance.primary_node])
1913     ins_l = ins_l[instance.primary_node]
1914     if not type(ins_l) is list:
1915       raise errors.OpExecError("Can't contact node '%s'" %
1916                                instance.primary_node)
1917
1918     if self.instance.name in ins_l:
1919       raise errors.OpExecError("Instance is running, can't shutdown"
1920                                " block devices.")
1921
1922     _ShutdownInstanceDisks(instance, self.cfg)
1923
1924
1925 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1926   """Shutdown block devices of an instance.
1927
1928   This does the shutdown on all nodes of the instance.
1929
1930   If the ignore_primary is false, errors on the primary node are
1931   ignored.
1932
1933   """
1934   result = True
1935   for disk in instance.disks:
1936     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1937       cfg.SetDiskID(top_disk, node)
1938       if not rpc.call_blockdev_shutdown(node, top_disk):
1939         logger.Error("could not shutdown block device %s on node %s" %
1940                      (disk.iv_name, node))
1941         if not ignore_primary or node != instance.primary_node:
1942           result = False
1943   return result
1944
1945
1946 class LUStartupInstance(LogicalUnit):
1947   """Starts an instance.
1948
1949   """
1950   HPATH = "instance-start"
1951   HTYPE = constants.HTYPE_INSTANCE
1952   _OP_REQP = ["instance_name", "force"]
1953
1954   def BuildHooksEnv(self):
1955     """Build hooks env.
1956
1957     This runs on master, primary and secondary nodes of the instance.
1958
1959     """
1960     env = {
1961       "FORCE": self.op.force,
1962       }
1963     env.update(_BuildInstanceHookEnvByObject(self.instance))
1964     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965           list(self.instance.secondary_nodes))
1966     return env, nl, nl
1967
1968   def CheckPrereq(self):
1969     """Check prerequisites.
1970
1971     This checks that the instance is in the cluster.
1972
1973     """
1974     instance = self.cfg.GetInstanceInfo(
1975       self.cfg.ExpandInstanceName(self.op.instance_name))
1976     if instance is None:
1977       raise errors.OpPrereqError("Instance '%s' not known" %
1978                                  self.op.instance_name)
1979
1980     # check bridges existance
1981     _CheckInstanceBridgesExist(instance)
1982
1983     self.instance = instance
1984     self.op.instance_name = instance.name
1985
1986   def Exec(self, feedback_fn):
1987     """Start the instance.
1988
1989     """
1990     instance = self.instance
1991     force = self.op.force
1992     extra_args = getattr(self.op, "extra_args", "")
1993
1994     node_current = instance.primary_node
1995
1996     nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1997     if not nodeinfo:
1998       raise errors.OpExecError("Could not contact node %s for infos" %
1999                                (node_current))
2000
2001     freememory = nodeinfo[node_current]['memory_free']
2002     memory = instance.memory
2003     if memory > freememory:
2004       raise errors.OpExecError("Not enough memory to start instance"
2005                                " %s on node %s"
2006                                " needed %s MiB, available %s MiB" %
2007                                (instance.name, node_current, memory,
2008                                 freememory))
2009
2010     _StartInstanceDisks(self.cfg, instance, force)
2011
2012     if not rpc.call_instance_start(node_current, instance, extra_args):
2013       _ShutdownInstanceDisks(instance, self.cfg)
2014       raise errors.OpExecError("Could not start instance")
2015
2016     self.cfg.MarkInstanceUp(instance.name)
2017
2018
2019 class LURebootInstance(LogicalUnit):
2020   """Reboot an instance.
2021
2022   """
2023   HPATH = "instance-reboot"
2024   HTYPE = constants.HTYPE_INSTANCE
2025   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2026
2027   def BuildHooksEnv(self):
2028     """Build hooks env.
2029
2030     This runs on master, primary and secondary nodes of the instance.
2031
2032     """
2033     env = {
2034       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2035       }
2036     env.update(_BuildInstanceHookEnvByObject(self.instance))
2037     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2038           list(self.instance.secondary_nodes))
2039     return env, nl, nl
2040
2041   def CheckPrereq(self):
2042     """Check prerequisites.
2043
2044     This checks that the instance is in the cluster.
2045
2046     """
2047     instance = self.cfg.GetInstanceInfo(
2048       self.cfg.ExpandInstanceName(self.op.instance_name))
2049     if instance is None:
2050       raise errors.OpPrereqError("Instance '%s' not known" %
2051                                  self.op.instance_name)
2052
2053     # check bridges existance
2054     _CheckInstanceBridgesExist(instance)
2055
2056     self.instance = instance
2057     self.op.instance_name = instance.name
2058
2059   def Exec(self, feedback_fn):
2060     """Reboot the instance.
2061
2062     """
2063     instance = self.instance
2064     ignore_secondaries = self.op.ignore_secondaries
2065     reboot_type = self.op.reboot_type
2066     extra_args = getattr(self.op, "extra_args", "")
2067
2068     node_current = instance.primary_node
2069
2070     if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2071                            constants.INSTANCE_REBOOT_HARD,
2072                            constants.INSTANCE_REBOOT_FULL]:
2073       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2074                                   (constants.INSTANCE_REBOOT_SOFT,
2075                                    constants.INSTANCE_REBOOT_HARD,
2076                                    constants.INSTANCE_REBOOT_FULL))
2077
2078     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2079                        constants.INSTANCE_REBOOT_HARD]:
2080       if not rpc.call_instance_reboot(node_current, instance,
2081                                       reboot_type, extra_args):
2082         raise errors.OpExecError("Could not reboot instance")
2083     else:
2084       if not rpc.call_instance_shutdown(node_current, instance):
2085         raise errors.OpExecError("could not shutdown instance for full reboot")
2086       _ShutdownInstanceDisks(instance, self.cfg)
2087       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2088       if not rpc.call_instance_start(node_current, instance, extra_args):
2089         _ShutdownInstanceDisks(instance, self.cfg)
2090         raise errors.OpExecError("Could not start instance for full reboot")
2091
2092     self.cfg.MarkInstanceUp(instance.name)
2093
2094
2095 class LUShutdownInstance(LogicalUnit):
2096   """Shutdown an instance.
2097
2098   """
2099   HPATH = "instance-stop"
2100   HTYPE = constants.HTYPE_INSTANCE
2101   _OP_REQP = ["instance_name"]
2102
2103   def BuildHooksEnv(self):
2104     """Build hooks env.
2105
2106     This runs on master, primary and secondary nodes of the instance.
2107
2108     """
2109     env = _BuildInstanceHookEnvByObject(self.instance)
2110     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2111           list(self.instance.secondary_nodes))
2112     return env, nl, nl
2113
2114   def CheckPrereq(self):
2115     """Check prerequisites.
2116
2117     This checks that the instance is in the cluster.
2118
2119     """
2120     instance = self.cfg.GetInstanceInfo(
2121       self.cfg.ExpandInstanceName(self.op.instance_name))
2122     if instance is None:
2123       raise errors.OpPrereqError("Instance '%s' not known" %
2124                                  self.op.instance_name)
2125     self.instance = instance
2126
2127   def Exec(self, feedback_fn):
2128     """Shutdown the instance.
2129
2130     """
2131     instance = self.instance
2132     node_current = instance.primary_node
2133     if not rpc.call_instance_shutdown(node_current, instance):
2134       logger.Error("could not shutdown instance")
2135
2136     self.cfg.MarkInstanceDown(instance.name)
2137     _ShutdownInstanceDisks(instance, self.cfg)
2138
2139
2140 class LUReinstallInstance(LogicalUnit):
2141   """Reinstall an instance.
2142
2143   """
2144   HPATH = "instance-reinstall"
2145   HTYPE = constants.HTYPE_INSTANCE
2146   _OP_REQP = ["instance_name"]
2147
2148   def BuildHooksEnv(self):
2149     """Build hooks env.
2150
2151     This runs on master, primary and secondary nodes of the instance.
2152
2153     """
2154     env = _BuildInstanceHookEnvByObject(self.instance)
2155     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2156           list(self.instance.secondary_nodes))
2157     return env, nl, nl
2158
2159   def CheckPrereq(self):
2160     """Check prerequisites.
2161
2162     This checks that the instance is in the cluster and is not running.
2163
2164     """
2165     instance = self.cfg.GetInstanceInfo(
2166       self.cfg.ExpandInstanceName(self.op.instance_name))
2167     if instance is None:
2168       raise errors.OpPrereqError("Instance '%s' not known" %
2169                                  self.op.instance_name)
2170     if instance.disk_template == constants.DT_DISKLESS:
2171       raise errors.OpPrereqError("Instance '%s' has no disks" %
2172                                  self.op.instance_name)
2173     if instance.status != "down":
2174       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2175                                  self.op.instance_name)
2176     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2177     if remote_info:
2178       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2179                                  (self.op.instance_name,
2180                                   instance.primary_node))
2181
2182     self.op.os_type = getattr(self.op, "os_type", None)
2183     if self.op.os_type is not None:
2184       # OS verification
2185       pnode = self.cfg.GetNodeInfo(
2186         self.cfg.ExpandNodeName(instance.primary_node))
2187       if pnode is None:
2188         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2189                                    self.op.pnode)
2190       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2191       if not os_obj:
2192         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2193                                    " primary node"  % self.op.os_type)
2194
2195     self.instance = instance
2196
2197   def Exec(self, feedback_fn):
2198     """Reinstall the instance.
2199
2200     """
2201     inst = self.instance
2202
2203     if self.op.os_type is not None:
2204       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2205       inst.os = self.op.os_type
2206       self.cfg.AddInstance(inst)
2207
2208     _StartInstanceDisks(self.cfg, inst, None)
2209     try:
2210       feedback_fn("Running the instance OS create scripts...")
2211       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2212         raise errors.OpExecError("Could not install OS for instance %s"
2213                                  " on node %s" %
2214                                  (inst.name, inst.primary_node))
2215     finally:
2216       _ShutdownInstanceDisks(inst, self.cfg)
2217
2218
2219 class LURenameInstance(LogicalUnit):
2220   """Rename an instance.
2221
2222   """
2223   HPATH = "instance-rename"
2224   HTYPE = constants.HTYPE_INSTANCE
2225   _OP_REQP = ["instance_name", "new_name"]
2226
2227   def BuildHooksEnv(self):
2228     """Build hooks env.
2229
2230     This runs on master, primary and secondary nodes of the instance.
2231
2232     """
2233     env = _BuildInstanceHookEnvByObject(self.instance)
2234     env["INSTANCE_NEW_NAME"] = self.op.new_name
2235     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2236           list(self.instance.secondary_nodes))
2237     return env, nl, nl
2238
2239   def CheckPrereq(self):
2240     """Check prerequisites.
2241
2242     This checks that the instance is in the cluster and is not running.
2243
2244     """
2245     instance = self.cfg.GetInstanceInfo(
2246       self.cfg.ExpandInstanceName(self.op.instance_name))
2247     if instance is None:
2248       raise errors.OpPrereqError("Instance '%s' not known" %
2249                                  self.op.instance_name)
2250     if instance.status != "down":
2251       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2252                                  self.op.instance_name)
2253     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2254     if remote_info:
2255       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2256                                  (self.op.instance_name,
2257                                   instance.primary_node))
2258     self.instance = instance
2259
2260     # new name verification
2261     name_info = utils.HostInfo(self.op.new_name)
2262
2263     self.op.new_name = new_name = name_info.name
2264     if not getattr(self.op, "ignore_ip", False):
2265       command = ["fping", "-q", name_info.ip]
2266       result = utils.RunCmd(command)
2267       if not result.failed:
2268         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2269                                    (name_info.ip, new_name))
2270
2271
2272   def Exec(self, feedback_fn):
2273     """Reinstall the instance.
2274
2275     """
2276     inst = self.instance
2277     old_name = inst.name
2278
2279     self.cfg.RenameInstance(inst.name, self.op.new_name)
2280
2281     # re-read the instance from the configuration after rename
2282     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2283
2284     _StartInstanceDisks(self.cfg, inst, None)
2285     try:
2286       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2287                                           "sda", "sdb"):
2288         msg = ("Could run OS rename script for instance %s on node %s (but the"
2289                " instance has been renamed in Ganeti)" %
2290                (inst.name, inst.primary_node))
2291         logger.Error(msg)
2292     finally:
2293       _ShutdownInstanceDisks(inst, self.cfg)
2294
2295
2296 class LURemoveInstance(LogicalUnit):
2297   """Remove an instance.
2298
2299   """
2300   HPATH = "instance-remove"
2301   HTYPE = constants.HTYPE_INSTANCE
2302   _OP_REQP = ["instance_name"]
2303
2304   def BuildHooksEnv(self):
2305     """Build hooks env.
2306
2307     This runs on master, primary and secondary nodes of the instance.
2308
2309     """
2310     env = _BuildInstanceHookEnvByObject(self.instance)
2311     nl = [self.sstore.GetMasterNode()]
2312     return env, nl, nl
2313
2314   def CheckPrereq(self):
2315     """Check prerequisites.
2316
2317     This checks that the instance is in the cluster.
2318
2319     """
2320     instance = self.cfg.GetInstanceInfo(
2321       self.cfg.ExpandInstanceName(self.op.instance_name))
2322     if instance is None:
2323       raise errors.OpPrereqError("Instance '%s' not known" %
2324                                  self.op.instance_name)
2325     self.instance = instance
2326
2327   def Exec(self, feedback_fn):
2328     """Remove the instance.
2329
2330     """
2331     instance = self.instance
2332     logger.Info("shutting down instance %s on node %s" %
2333                 (instance.name, instance.primary_node))
2334
2335     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2336       if self.op.ignore_failures:
2337         feedback_fn("Warning: can't shutdown instance")
2338       else:
2339         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2340                                  (instance.name, instance.primary_node))
2341
2342     logger.Info("removing block devices for instance %s" % instance.name)
2343
2344     if not _RemoveDisks(instance, self.cfg):
2345       if self.op.ignore_failures:
2346         feedback_fn("Warning: can't remove instance's disks")
2347       else:
2348         raise errors.OpExecError("Can't remove instance's disks")
2349
2350     logger.Info("removing instance %s out of cluster config" % instance.name)
2351
2352     self.cfg.RemoveInstance(instance.name)
2353
2354
2355 class LUQueryInstances(NoHooksLU):
2356   """Logical unit for querying instances.
2357
2358   """
2359   _OP_REQP = ["output_fields", "names"]
2360
2361   def CheckPrereq(self):
2362     """Check prerequisites.
2363
2364     This checks that the fields required are valid output fields.
2365
2366     """
2367     self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2368     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2369                                "admin_state", "admin_ram",
2370                                "disk_template", "ip", "mac", "bridge",
2371                                "sda_size", "sdb_size"],
2372                        dynamic=self.dynamic_fields,
2373                        selected=self.op.output_fields)
2374
2375     self.wanted = _GetWantedInstances(self, self.op.names)
2376
2377   def Exec(self, feedback_fn):
2378     """Computes the list of nodes and their attributes.
2379
2380     """
2381     instance_names = self.wanted
2382     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2383                      in instance_names]
2384
2385     # begin data gathering
2386
2387     nodes = frozenset([inst.primary_node for inst in instance_list])
2388
2389     bad_nodes = []
2390     if self.dynamic_fields.intersection(self.op.output_fields):
2391       live_data = {}
2392       node_data = rpc.call_all_instances_info(nodes)
2393       for name in nodes:
2394         result = node_data[name]
2395         if result:
2396           live_data.update(result)
2397         elif result == False:
2398           bad_nodes.append(name)
2399         # else no instance is alive
2400     else:
2401       live_data = dict([(name, {}) for name in instance_names])
2402
2403     # end data gathering
2404
2405     output = []
2406     for instance in instance_list:
2407       iout = []
2408       for field in self.op.output_fields:
2409         if field == "name":
2410           val = instance.name
2411         elif field == "os":
2412           val = instance.os
2413         elif field == "pnode":
2414           val = instance.primary_node
2415         elif field == "snodes":
2416           val = list(instance.secondary_nodes)
2417         elif field == "admin_state":
2418           val = (instance.status != "down")
2419         elif field == "oper_state":
2420           if instance.primary_node in bad_nodes:
2421             val = None
2422           else:
2423             val = bool(live_data.get(instance.name))
2424         elif field == "admin_ram":
2425           val = instance.memory
2426         elif field == "oper_ram":
2427           if instance.primary_node in bad_nodes:
2428             val = None
2429           elif instance.name in live_data:
2430             val = live_data[instance.name].get("memory", "?")
2431           else:
2432             val = "-"
2433         elif field == "disk_template":
2434           val = instance.disk_template
2435         elif field == "ip":
2436           val = instance.nics[0].ip
2437         elif field == "bridge":
2438           val = instance.nics[0].bridge
2439         elif field == "mac":
2440           val = instance.nics[0].mac
2441         elif field == "sda_size" or field == "sdb_size":
2442           disk = instance.FindDisk(field[:3])
2443           if disk is None:
2444             val = None
2445           else:
2446             val = disk.size
2447         else:
2448           raise errors.ParameterError(field)
2449         iout.append(val)
2450       output.append(iout)
2451
2452     return output
2453
2454
2455 class LUFailoverInstance(LogicalUnit):
2456   """Failover an instance.
2457
2458   """
2459   HPATH = "instance-failover"
2460   HTYPE = constants.HTYPE_INSTANCE
2461   _OP_REQP = ["instance_name", "ignore_consistency"]
2462
2463   def BuildHooksEnv(self):
2464     """Build hooks env.
2465
2466     This runs on master, primary and secondary nodes of the instance.
2467
2468     """
2469     env = {
2470       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2471       }
2472     env.update(_BuildInstanceHookEnvByObject(self.instance))
2473     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2474     return env, nl, nl
2475
2476   def CheckPrereq(self):
2477     """Check prerequisites.
2478
2479     This checks that the instance is in the cluster.
2480
2481     """
2482     instance = self.cfg.GetInstanceInfo(
2483       self.cfg.ExpandInstanceName(self.op.instance_name))
2484     if instance is None:
2485       raise errors.OpPrereqError("Instance '%s' not known" %
2486                                  self.op.instance_name)
2487
2488     if instance.disk_template not in constants.DTS_NET_MIRROR:
2489       raise errors.OpPrereqError("Instance's disk layout is not"
2490                                  " network mirrored, cannot failover.")
2491
2492     secondary_nodes = instance.secondary_nodes
2493     if not secondary_nodes:
2494       raise errors.ProgrammerError("no secondary node but using "
2495                                    "DT_REMOTE_RAID1 template")
2496
2497     # check memory requirements on the secondary node
2498     target_node = secondary_nodes[0]
2499     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2500     info = nodeinfo.get(target_node, None)
2501     if not info:
2502       raise errors.OpPrereqError("Cannot get current information"
2503                                  " from node '%s'" % nodeinfo)
2504     if instance.memory > info['memory_free']:
2505       raise errors.OpPrereqError("Not enough memory on target node %s."
2506                                  " %d MB available, %d MB required" %
2507                                  (target_node, info['memory_free'],
2508                                   instance.memory))
2509
2510     # check bridge existance
2511     brlist = [nic.bridge for nic in instance.nics]
2512     if not rpc.call_bridges_exist(target_node, brlist):
2513       raise errors.OpPrereqError("One or more target bridges %s does not"
2514                                  " exist on destination node '%s'" %
2515                                  (brlist, target_node))
2516
2517     self.instance = instance
2518
2519   def Exec(self, feedback_fn):
2520     """Failover an instance.
2521
2522     The failover is done by shutting it down on its present node and
2523     starting it on the secondary.
2524
2525     """
2526     instance = self.instance
2527
2528     source_node = instance.primary_node
2529     target_node = instance.secondary_nodes[0]
2530
2531     feedback_fn("* checking disk consistency between source and target")
2532     for dev in instance.disks:
2533       # for remote_raid1, these are md over drbd
2534       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2535         if not self.op.ignore_consistency:
2536           raise errors.OpExecError("Disk %s is degraded on target node,"
2537                                    " aborting failover." % dev.iv_name)
2538
2539     feedback_fn("* checking target node resource availability")
2540     nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2541
2542     if not nodeinfo:
2543       raise errors.OpExecError("Could not contact target node %s." %
2544                                target_node)
2545
2546     free_memory = int(nodeinfo[target_node]['memory_free'])
2547     memory = instance.memory
2548     if memory > free_memory:
2549       raise errors.OpExecError("Not enough memory to create instance %s on"
2550                                " node %s. needed %s MiB, available %s MiB" %
2551                                (instance.name, target_node, memory,
2552                                 free_memory))
2553
2554     feedback_fn("* shutting down instance on source node")
2555     logger.Info("Shutting down instance %s on node %s" %
2556                 (instance.name, source_node))
2557
2558     if not rpc.call_instance_shutdown(source_node, instance):
2559       if self.op.ignore_consistency:
2560         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2561                      " anyway. Please make sure node %s is down"  %
2562                      (instance.name, source_node, source_node))
2563       else:
2564         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2565                                  (instance.name, source_node))
2566
2567     feedback_fn("* deactivating the instance's disks on source node")
2568     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2569       raise errors.OpExecError("Can't shut down the instance's disks.")
2570
2571     instance.primary_node = target_node
2572     # distribute new instance config to the other nodes
2573     self.cfg.AddInstance(instance)
2574
2575     feedback_fn("* activating the instance's disks on target node")
2576     logger.Info("Starting instance %s on node %s" %
2577                 (instance.name, target_node))
2578
2579     disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2580                                              ignore_secondaries=True)
2581     if not disks_ok:
2582       _ShutdownInstanceDisks(instance, self.cfg)
2583       raise errors.OpExecError("Can't activate the instance's disks")
2584
2585     feedback_fn("* starting the instance on the target node")
2586     if not rpc.call_instance_start(target_node, instance, None):
2587       _ShutdownInstanceDisks(instance, self.cfg)
2588       raise errors.OpExecError("Could not start instance %s on node %s." %
2589                                (instance.name, target_node))
2590
2591
2592 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2593   """Create a tree of block devices on the primary node.
2594
2595   This always creates all devices.
2596
2597   """
2598   if device.children:
2599     for child in device.children:
2600       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2601         return False
2602
2603   cfg.SetDiskID(device, node)
2604   new_id = rpc.call_blockdev_create(node, device, device.size,
2605                                     instance.name, True, info)
2606   if not new_id:
2607     return False
2608   if device.physical_id is None:
2609     device.physical_id = new_id
2610   return True
2611
2612
2613 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2614   """Create a tree of block devices on a secondary node.
2615
2616   If this device type has to be created on secondaries, create it and
2617   all its children.
2618
2619   If not, just recurse to children keeping the same 'force' value.
2620
2621   """
2622   if device.CreateOnSecondary():
2623     force = True
2624   if device.children:
2625     for child in device.children:
2626       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2627                                         child, force, info):
2628         return False
2629
2630   if not force:
2631     return True
2632   cfg.SetDiskID(device, node)
2633   new_id = rpc.call_blockdev_create(node, device, device.size,
2634                                     instance.name, False, info)
2635   if not new_id:
2636     return False
2637   if device.physical_id is None:
2638     device.physical_id = new_id
2639   return True
2640
2641
2642 def _GenerateUniqueNames(cfg, exts):
2643   """Generate a suitable LV name.
2644
2645   This will generate a logical volume name for the given instance.
2646
2647   """
2648   results = []
2649   for val in exts:
2650     new_id = cfg.GenerateUniqueID()
2651     results.append("%s%s" % (new_id, val))
2652   return results
2653
2654
2655 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2656   """Generate a drbd device complete with its children.
2657
2658   """
2659   port = cfg.AllocatePort()
2660   vgname = cfg.GetVGName()
2661   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2662                           logical_id=(vgname, names[0]))
2663   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2664                           logical_id=(vgname, names[1]))
2665   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2666                           logical_id = (primary, secondary, port),
2667                           children = [dev_data, dev_meta])
2668   return drbd_dev
2669
2670
2671 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2672   """Generate a drbd8 device complete with its children.
2673
2674   """
2675   port = cfg.AllocatePort()
2676   vgname = cfg.GetVGName()
2677   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2678                           logical_id=(vgname, names[0]))
2679   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2680                           logical_id=(vgname, names[1]))
2681   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2682                           logical_id = (primary, secondary, port),
2683                           children = [dev_data, dev_meta],
2684                           iv_name=iv_name)
2685   return drbd_dev
2686
2687 def _GenerateDiskTemplate(cfg, template_name,
2688                           instance_name, primary_node,
2689                           secondary_nodes, disk_sz, swap_sz):
2690   """Generate the entire disk layout for a given template type.
2691
2692   """
2693   #TODO: compute space requirements
2694
2695   vgname = cfg.GetVGName()
2696   if template_name == "diskless":
2697     disks = []
2698   elif template_name == "plain":
2699     if len(secondary_nodes) != 0:
2700       raise errors.ProgrammerError("Wrong template configuration")
2701
2702     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2703     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2704                            logical_id=(vgname, names[0]),
2705                            iv_name = "sda")
2706     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2707                            logical_id=(vgname, names[1]),
2708                            iv_name = "sdb")
2709     disks = [sda_dev, sdb_dev]
2710   elif template_name == "local_raid1":
2711     if len(secondary_nodes) != 0:
2712       raise errors.ProgrammerError("Wrong template configuration")
2713
2714
2715     names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2716                                        ".sdb_m1", ".sdb_m2"])
2717     sda_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2718                               logical_id=(vgname, names[0]))
2719     sda_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2720                               logical_id=(vgname, names[1]))
2721     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sda",
2722                               size=disk_sz,
2723                               children = [sda_dev_m1, sda_dev_m2])
2724     sdb_dev_m1 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2725                               logical_id=(vgname, names[2]))
2726     sdb_dev_m2 = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2727                               logical_id=(vgname, names[3]))
2728     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name = "sdb",
2729                               size=swap_sz,
2730                               children = [sdb_dev_m1, sdb_dev_m2])
2731     disks = [md_sda_dev, md_sdb_dev]
2732   elif template_name == constants.DT_REMOTE_RAID1:
2733     if len(secondary_nodes) != 1:
2734       raise errors.ProgrammerError("Wrong template configuration")
2735     remote_node = secondary_nodes[0]
2736     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2737                                        ".sdb_data", ".sdb_meta"])
2738     drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2739                                          disk_sz, names[0:2])
2740     md_sda_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sda",
2741                               children = [drbd_sda_dev], size=disk_sz)
2742     drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2743                                          swap_sz, names[2:4])
2744     md_sdb_dev = objects.Disk(dev_type=constants.LD_MD_R1, iv_name="sdb",
2745                               children = [drbd_sdb_dev], size=swap_sz)
2746     disks = [md_sda_dev, md_sdb_dev]
2747   elif template_name == constants.DT_DRBD8:
2748     if len(secondary_nodes) != 1:
2749       raise errors.ProgrammerError("Wrong template configuration")
2750     remote_node = secondary_nodes[0]
2751     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2752                                        ".sdb_data", ".sdb_meta"])
2753     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2754                                          disk_sz, names[0:2], "sda")
2755     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2756                                          swap_sz, names[2:4], "sdb")
2757     disks = [drbd_sda_dev, drbd_sdb_dev]
2758   else:
2759     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2760   return disks
2761
2762
2763 def _GetInstanceInfoText(instance):
2764   """Compute that text that should be added to the disk's metadata.
2765
2766   """
2767   return "originstname+%s" % instance.name
2768
2769
2770 def _CreateDisks(cfg, instance):
2771   """Create all disks for an instance.
2772
2773   This abstracts away some work from AddInstance.
2774
2775   Args:
2776     instance: the instance object
2777
2778   Returns:
2779     True or False showing the success of the creation process
2780
2781   """
2782   info = _GetInstanceInfoText(instance)
2783
2784   for device in instance.disks:
2785     logger.Info("creating volume %s for instance %s" %
2786               (device.iv_name, instance.name))
2787     #HARDCODE
2788     for secondary_node in instance.secondary_nodes:
2789       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2790                                         device, False, info):
2791         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2792                      (device.iv_name, device, secondary_node))
2793         return False
2794     #HARDCODE
2795     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2796                                     instance, device, info):
2797       logger.Error("failed to create volume %s on primary!" %
2798                    device.iv_name)
2799       return False
2800   return True
2801
2802
2803 def _RemoveDisks(instance, cfg):
2804   """Remove all disks for an instance.
2805
2806   This abstracts away some work from `AddInstance()` and
2807   `RemoveInstance()`. Note that in case some of the devices couldn't
2808   be removed, the removal will continue with the other ones (compare
2809   with `_CreateDisks()`).
2810
2811   Args:
2812     instance: the instance object
2813
2814   Returns:
2815     True or False showing the success of the removal proces
2816
2817   """
2818   logger.Info("removing block devices for instance %s" % instance.name)
2819
2820   result = True
2821   for device in instance.disks:
2822     for node, disk in device.ComputeNodeTree(instance.primary_node):
2823       cfg.SetDiskID(disk, node)
2824       if not rpc.call_blockdev_remove(node, disk):
2825         logger.Error("could not remove block device %s on node %s,"
2826                      " continuing anyway" %
2827                      (device.iv_name, node))
2828         result = False
2829   return result
2830
2831
2832 class LUCreateInstance(LogicalUnit):
2833   """Create an instance.
2834
2835   """
2836   HPATH = "instance-add"
2837   HTYPE = constants.HTYPE_INSTANCE
2838   _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2839               "disk_template", "swap_size", "mode", "start", "vcpus",
2840               "wait_for_sync", "ip_check", "mac"]
2841
2842   def BuildHooksEnv(self):
2843     """Build hooks env.
2844
2845     This runs on master, primary and secondary nodes of the instance.
2846
2847     """
2848     env = {
2849       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2850       "INSTANCE_DISK_SIZE": self.op.disk_size,
2851       "INSTANCE_SWAP_SIZE": self.op.swap_size,
2852       "INSTANCE_ADD_MODE": self.op.mode,
2853       }
2854     if self.op.mode == constants.INSTANCE_IMPORT:
2855       env["INSTANCE_SRC_NODE"] = self.op.src_node
2856       env["INSTANCE_SRC_PATH"] = self.op.src_path
2857       env["INSTANCE_SRC_IMAGE"] = self.src_image
2858
2859     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2860       primary_node=self.op.pnode,
2861       secondary_nodes=self.secondaries,
2862       status=self.instance_status,
2863       os_type=self.op.os_type,
2864       memory=self.op.mem_size,
2865       vcpus=self.op.vcpus,
2866       nics=[(self.inst_ip, self.op.bridge)],
2867     ))
2868
2869     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2870           self.secondaries)
2871     return env, nl, nl
2872
2873
2874   def CheckPrereq(self):
2875     """Check prerequisites.
2876
2877     """
2878     if self.op.mode not in (constants.INSTANCE_CREATE,
2879                             constants.INSTANCE_IMPORT):
2880       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2881                                  self.op.mode)
2882
2883     if self.op.mode == constants.INSTANCE_IMPORT:
2884       src_node = getattr(self.op, "src_node", None)
2885       src_path = getattr(self.op, "src_path", None)
2886       if src_node is None or src_path is None:
2887         raise errors.OpPrereqError("Importing an instance requires source"
2888                                    " node and path options")
2889       src_node_full = self.cfg.ExpandNodeName(src_node)
2890       if src_node_full is None:
2891         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2892       self.op.src_node = src_node = src_node_full
2893
2894       if not os.path.isabs(src_path):
2895         raise errors.OpPrereqError("The source path must be absolute")
2896
2897       export_info = rpc.call_export_info(src_node, src_path)
2898
2899       if not export_info:
2900         raise errors.OpPrereqError("No export found in dir %s" % src_path)
2901
2902       if not export_info.has_section(constants.INISECT_EXP):
2903         raise errors.ProgrammerError("Corrupted export config")
2904
2905       ei_version = export_info.get(constants.INISECT_EXP, 'version')
2906       if (int(ei_version) != constants.EXPORT_VERSION):
2907         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2908                                    (ei_version, constants.EXPORT_VERSION))
2909
2910       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2911         raise errors.OpPrereqError("Can't import instance with more than"
2912                                    " one data disk")
2913
2914       # FIXME: are the old os-es, disk sizes, etc. useful?
2915       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2916       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2917                                                          'disk0_dump'))
2918       self.src_image = diskimage
2919     else: # INSTANCE_CREATE
2920       if getattr(self.op, "os_type", None) is None:
2921         raise errors.OpPrereqError("No guest OS specified")
2922
2923     # check primary node
2924     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2925     if pnode is None:
2926       raise errors.OpPrereqError("Primary node '%s' is unknown" %
2927                                  self.op.pnode)
2928     self.op.pnode = pnode.name
2929     self.pnode = pnode
2930     self.secondaries = []
2931     # disk template and mirror node verification
2932     if self.op.disk_template not in constants.DISK_TEMPLATES:
2933       raise errors.OpPrereqError("Invalid disk template name")
2934
2935     if self.op.disk_template in constants.DTS_NET_MIRROR:
2936       if getattr(self.op, "snode", None) is None:
2937         raise errors.OpPrereqError("The networked disk templates need"
2938                                    " a mirror node")
2939
2940       snode_name = self.cfg.ExpandNodeName(self.op.snode)
2941       if snode_name is None:
2942         raise errors.OpPrereqError("Unknown secondary node '%s'" %
2943                                    self.op.snode)
2944       elif snode_name == pnode.name:
2945         raise errors.OpPrereqError("The secondary node cannot be"
2946                                    " the primary node.")
2947       self.secondaries.append(snode_name)
2948
2949     # Check lv size requirements
2950     nodenames = [pnode.name] + self.secondaries
2951     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2952
2953     # Required free disk space as a function of disk and swap space
2954     req_size_dict = {
2955       constants.DT_DISKLESS: 0,
2956       constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2957       constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2958       # 256 MB are added for drbd metadata, 128MB for each drbd device
2959       constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2960       constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
2961     }
2962
2963     if self.op.disk_template not in req_size_dict:
2964       raise errors.ProgrammerError("Disk template '%s' size requirement"
2965                                    " is unknown" %  self.op.disk_template)
2966
2967     req_size = req_size_dict[self.op.disk_template]
2968
2969     for node in nodenames:
2970       info = nodeinfo.get(node, None)
2971       if not info:
2972         raise errors.OpPrereqError("Cannot get current information"
2973                                    " from node '%s'" % nodeinfo)
2974       if req_size > info['vg_free']:
2975         raise errors.OpPrereqError("Not enough disk space on target node %s."
2976                                    " %d MB available, %d MB required" %
2977                                    (node, info['vg_free'], req_size))
2978
2979     # os verification
2980     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2981     if not os_obj:
2982       raise errors.OpPrereqError("OS '%s' not in supported os list for"
2983                                  " primary node"  % self.op.os_type)
2984
2985     # instance verification
2986     hostname1 = utils.HostInfo(self.op.instance_name)
2987
2988     self.op.instance_name = instance_name = hostname1.name
2989     instance_list = self.cfg.GetInstanceList()
2990     if instance_name in instance_list:
2991       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2992                                  instance_name)
2993
2994     ip = getattr(self.op, "ip", None)
2995     if ip is None or ip.lower() == "none":
2996       inst_ip = None
2997     elif ip.lower() == "auto":
2998       inst_ip = hostname1.ip
2999     else:
3000       if not utils.IsValidIP(ip):
3001         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3002                                    " like a valid IP" % ip)
3003       inst_ip = ip
3004     self.inst_ip = inst_ip
3005
3006     if self.op.start and not self.op.ip_check:
3007       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3008                                  " adding an instance in start mode")
3009
3010     if self.op.ip_check:
3011       if utils.TcpPing(utils.HostInfo().name, hostname1.ip,
3012                        constants.DEFAULT_NODED_PORT):
3013         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3014                                    (hostname1.ip, instance_name))
3015
3016     # MAC address verification
3017     if self.op.mac != "auto":
3018       if not utils.IsValidMac(self.op.mac.lower()):
3019         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3020                                    self.op.mac)
3021
3022     # bridge verification
3023     bridge = getattr(self.op, "bridge", None)
3024     if bridge is None:
3025       self.op.bridge = self.cfg.GetDefBridge()
3026     else:
3027       self.op.bridge = bridge
3028
3029     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3030       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3031                                  " destination node '%s'" %
3032                                  (self.op.bridge, pnode.name))
3033
3034     if self.op.start:
3035       self.instance_status = 'up'
3036     else:
3037       self.instance_status = 'down'
3038
3039   def Exec(self, feedback_fn):
3040     """Create and add the instance to the cluster.
3041
3042     """
3043     instance = self.op.instance_name
3044     pnode_name = self.pnode.name
3045
3046     if self.op.mac == "auto":
3047       mac_address=self.cfg.GenerateMAC()
3048     else:
3049       mac_address=self.op.mac
3050
3051     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3052     if self.inst_ip is not None:
3053       nic.ip = self.inst_ip
3054
3055     ht_kind = self.sstore.GetHypervisorType()
3056     if ht_kind in constants.HTS_REQ_PORT:
3057       network_port = self.cfg.AllocatePort()
3058     else:
3059       network_port = None
3060
3061     disks = _GenerateDiskTemplate(self.cfg,
3062                                   self.op.disk_template,
3063                                   instance, pnode_name,
3064                                   self.secondaries, self.op.disk_size,
3065                                   self.op.swap_size)
3066
3067     iobj = objects.Instance(name=instance, os=self.op.os_type,
3068                             primary_node=pnode_name,
3069                             memory=self.op.mem_size,
3070                             vcpus=self.op.vcpus,
3071                             nics=[nic], disks=disks,
3072                             disk_template=self.op.disk_template,
3073                             status=self.instance_status,
3074                             network_port=network_port,
3075                             )
3076
3077     feedback_fn("* creating instance disks...")
3078     if not _CreateDisks(self.cfg, iobj):
3079       _RemoveDisks(iobj, self.cfg)
3080       raise errors.OpExecError("Device creation failed, reverting...")
3081
3082     feedback_fn("adding instance %s to cluster config" % instance)
3083
3084     self.cfg.AddInstance(iobj)
3085
3086     if self.op.wait_for_sync:
3087       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3088     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3089       # make sure the disks are not degraded (still sync-ing is ok)
3090       time.sleep(15)
3091       feedback_fn("* checking mirrors status")
3092       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3093     else:
3094       disk_abort = False
3095
3096     if disk_abort:
3097       _RemoveDisks(iobj, self.cfg)
3098       self.cfg.RemoveInstance(iobj.name)
3099       raise errors.OpExecError("There are some degraded disks for"
3100                                " this instance")
3101
3102     feedback_fn("creating os for instance %s on node %s" %
3103                 (instance, pnode_name))
3104
3105     if iobj.disk_template != constants.DT_DISKLESS:
3106       if self.op.mode == constants.INSTANCE_CREATE:
3107         feedback_fn("* running the instance OS create scripts...")
3108         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3109           raise errors.OpExecError("could not add os for instance %s"
3110                                    " on node %s" %
3111                                    (instance, pnode_name))
3112
3113       elif self.op.mode == constants.INSTANCE_IMPORT:
3114         feedback_fn("* running the instance OS import scripts...")
3115         src_node = self.op.src_node
3116         src_image = self.src_image
3117         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3118                                                 src_node, src_image):
3119           raise errors.OpExecError("Could not import os for instance"
3120                                    " %s on node %s" %
3121                                    (instance, pnode_name))
3122       else:
3123         # also checked in the prereq part
3124         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3125                                      % self.op.mode)
3126
3127     if self.op.start:
3128       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3129       feedback_fn("* starting instance...")
3130       if not rpc.call_instance_start(pnode_name, iobj, None):
3131         raise errors.OpExecError("Could not start instance")
3132
3133
3134 class LUConnectConsole(NoHooksLU):
3135   """Connect to an instance's console.
3136
3137   This is somewhat special in that it returns the command line that
3138   you need to run on the master node in order to connect to the
3139   console.
3140
3141   """
3142   _OP_REQP = ["instance_name"]
3143
3144   def CheckPrereq(self):
3145     """Check prerequisites.
3146
3147     This checks that the instance is in the cluster.
3148
3149     """
3150     instance = self.cfg.GetInstanceInfo(
3151       self.cfg.ExpandInstanceName(self.op.instance_name))
3152     if instance is None:
3153       raise errors.OpPrereqError("Instance '%s' not known" %
3154                                  self.op.instance_name)
3155     self.instance = instance
3156
3157   def Exec(self, feedback_fn):
3158     """Connect to the console of an instance
3159
3160     """
3161     instance = self.instance
3162     node = instance.primary_node
3163
3164     node_insts = rpc.call_instance_list([node])[node]
3165     if node_insts is False:
3166       raise errors.OpExecError("Can't connect to node %s." % node)
3167
3168     if instance.name not in node_insts:
3169       raise errors.OpExecError("Instance %s is not running." % instance.name)
3170
3171     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3172
3173     hyper = hypervisor.GetHypervisor()
3174     console_cmd = hyper.GetShellCommandForConsole(instance)
3175     # build ssh cmdline
3176     argv = ["ssh", "-q", "-t"]
3177     argv.extend(ssh.KNOWN_HOSTS_OPTS)
3178     argv.extend(ssh.BATCH_MODE_OPTS)
3179     argv.append(node)
3180     argv.append(console_cmd)
3181     return "ssh", argv
3182
3183
3184 class LUAddMDDRBDComponent(LogicalUnit):
3185   """Adda new mirror member to an instance's disk.
3186
3187   """
3188   HPATH = "mirror-add"
3189   HTYPE = constants.HTYPE_INSTANCE
3190   _OP_REQP = ["instance_name", "remote_node", "disk_name"]
3191
3192   def BuildHooksEnv(self):
3193     """Build hooks env.
3194
3195     This runs on the master, the primary and all the secondaries.
3196
3197     """
3198     env = {
3199       "NEW_SECONDARY": self.op.remote_node,
3200       "DISK_NAME": self.op.disk_name,
3201       }
3202     env.update(_BuildInstanceHookEnvByObject(self.instance))
3203     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3204           self.op.remote_node,] + list(self.instance.secondary_nodes)
3205     return env, nl, nl
3206
3207   def CheckPrereq(self):
3208     """Check prerequisites.
3209
3210     This checks that the instance is in the cluster.
3211
3212     """
3213     instance = self.cfg.GetInstanceInfo(
3214       self.cfg.ExpandInstanceName(self.op.instance_name))
3215     if instance is None:
3216       raise errors.OpPrereqError("Instance '%s' not known" %
3217                                  self.op.instance_name)
3218     self.instance = instance
3219
3220     remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3221     if remote_node is None:
3222       raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
3223     self.remote_node = remote_node
3224
3225     if remote_node == instance.primary_node:
3226       raise errors.OpPrereqError("The specified node is the primary node of"
3227                                  " the instance.")
3228
3229     if instance.disk_template != constants.DT_REMOTE_RAID1:
3230       raise errors.OpPrereqError("Instance's disk layout is not"
3231                                  " remote_raid1.")
3232     for disk in instance.disks:
3233       if disk.iv_name == self.op.disk_name:
3234         break
3235     else:
3236       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3237                                  " instance." % self.op.disk_name)
3238     if len(disk.children) > 1:
3239       raise errors.OpPrereqError("The device already has two slave devices."
3240                                  " This would create a 3-disk raid1 which we"
3241                                  " don't allow.")
3242     self.disk = disk
3243
3244   def Exec(self, feedback_fn):
3245     """Add the mirror component
3246
3247     """
3248     disk = self.disk
3249     instance = self.instance
3250
3251     remote_node = self.remote_node
3252     lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
3253     names = _GenerateUniqueNames(self.cfg, lv_names)
3254     new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
3255                                      remote_node, disk.size, names)
3256
3257     logger.Info("adding new mirror component on secondary")
3258     #HARDCODE
3259     if not _CreateBlockDevOnSecondary(self.cfg, remote_node, instance,
3260                                       new_drbd, False,
3261                                       _GetInstanceInfoText(instance)):
3262       raise errors.OpExecError("Failed to create new component on secondary"
3263                                " node %s" % remote_node)
3264
3265     logger.Info("adding new mirror component on primary")
3266     #HARDCODE
3267     if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node,
3268                                     instance, new_drbd,
3269                                     _GetInstanceInfoText(instance)):
3270       # remove secondary dev
3271       self.cfg.SetDiskID(new_drbd, remote_node)
3272       rpc.call_blockdev_remove(remote_node, new_drbd)
3273       raise errors.OpExecError("Failed to create volume on primary")
3274
3275     # the device exists now
3276     # call the primary node to add the mirror to md
3277     logger.Info("adding new mirror component to md")
3278     if not rpc.call_blockdev_addchildren(instance.primary_node,
3279                                          disk, [new_drbd]):
3280       logger.Error("Can't add mirror compoment to md!")
3281       self.cfg.SetDiskID(new_drbd, remote_node)
3282       if not rpc.call_blockdev_remove(remote_node, new_drbd):
3283         logger.Error("Can't rollback on secondary")
3284       self.cfg.SetDiskID(new_drbd, instance.primary_node)
3285       if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3286         logger.Error("Can't rollback on primary")
3287       raise errors.OpExecError("Can't add mirror component to md array")
3288
3289     disk.children.append(new_drbd)
3290
3291     self.cfg.AddInstance(instance)
3292
3293     _WaitForSync(self.cfg, instance, self.proc)
3294
3295     return 0
3296
3297
3298 class LURemoveMDDRBDComponent(LogicalUnit):
3299   """Remove a component from a remote_raid1 disk.
3300
3301   """
3302   HPATH = "mirror-remove"
3303   HTYPE = constants.HTYPE_INSTANCE
3304   _OP_REQP = ["instance_name", "disk_name", "disk_id"]
3305
3306   def BuildHooksEnv(self):
3307     """Build hooks env.
3308
3309     This runs on the master, the primary and all the secondaries.
3310
3311     """
3312     env = {
3313       "DISK_NAME": self.op.disk_name,
3314       "DISK_ID": self.op.disk_id,
3315       "OLD_SECONDARY": self.old_secondary,
3316       }
3317     env.update(_BuildInstanceHookEnvByObject(self.instance))
3318     nl = [self.sstore.GetMasterNode(),
3319           self.instance.primary_node] + list(self.instance.secondary_nodes)
3320     return env, nl, nl
3321
3322   def CheckPrereq(self):
3323     """Check prerequisites.
3324
3325     This checks that the instance is in the cluster.
3326
3327     """
3328     instance = self.cfg.GetInstanceInfo(
3329       self.cfg.ExpandInstanceName(self.op.instance_name))
3330     if instance is None:
3331       raise errors.OpPrereqError("Instance '%s' not known" %
3332                                  self.op.instance_name)
3333     self.instance = instance
3334
3335     if instance.disk_template != constants.DT_REMOTE_RAID1:
3336       raise errors.OpPrereqError("Instance's disk layout is not"
3337                                  " remote_raid1.")
3338     for disk in instance.disks:
3339       if disk.iv_name == self.op.disk_name:
3340         break
3341     else:
3342       raise errors.OpPrereqError("Can't find this device ('%s') in the"
3343                                  " instance." % self.op.disk_name)
3344     for child in disk.children:
3345       if (child.dev_type == constants.LD_DRBD7 and
3346           child.logical_id[2] == self.op.disk_id):
3347         break
3348     else:
3349       raise errors.OpPrereqError("Can't find the device with this port.")
3350
3351     if len(disk.children) < 2:
3352       raise errors.OpPrereqError("Cannot remove the last component from"
3353                                  " a mirror.")
3354     self.disk = disk
3355     self.child = child
3356     if self.child.logical_id[0] == instance.primary_node:
3357       oid = 1
3358     else:
3359       oid = 0
3360     self.old_secondary = self.child.logical_id[oid]
3361
3362   def Exec(self, feedback_fn):
3363     """Remove the mirror component
3364
3365     """
3366     instance = self.instance
3367     disk = self.disk
3368     child = self.child
3369     logger.Info("remove mirror component")
3370     self.cfg.SetDiskID(disk, instance.primary_node)
3371     if not rpc.call_blockdev_removechildren(instance.primary_node,
3372                                             disk, [child]):
3373       raise errors.OpExecError("Can't remove child from mirror.")
3374
3375     for node in child.logical_id[:2]:
3376       self.cfg.SetDiskID(child, node)
3377       if not rpc.call_blockdev_remove(node, child):
3378         logger.Error("Warning: failed to remove device from node %s,"
3379                      " continuing operation." % node)
3380
3381     disk.children.remove(child)
3382     self.cfg.AddInstance(instance)
3383
3384
3385 class LUReplaceDisks(LogicalUnit):
3386   """Replace the disks of an instance.
3387
3388   """
3389   HPATH = "mirrors-replace"
3390   HTYPE = constants.HTYPE_INSTANCE
3391   _OP_REQP = ["instance_name", "mode", "disks"]
3392
3393   def BuildHooksEnv(self):
3394     """Build hooks env.
3395
3396     This runs on the master, the primary and all the secondaries.
3397
3398     """
3399     env = {
3400       "MODE": self.op.mode,
3401       "NEW_SECONDARY": self.op.remote_node,
3402       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3403       }
3404     env.update(_BuildInstanceHookEnvByObject(self.instance))
3405     nl = [
3406       self.sstore.GetMasterNode(),
3407       self.instance.primary_node,
3408       ]
3409     if self.op.remote_node is not None:
3410       nl.append(self.op.remote_node)
3411     return env, nl, nl
3412
3413   def CheckPrereq(self):
3414     """Check prerequisites.
3415
3416     This checks that the instance is in the cluster.
3417
3418     """
3419     instance = self.cfg.GetInstanceInfo(
3420       self.cfg.ExpandInstanceName(self.op.instance_name))
3421     if instance is None:
3422       raise errors.OpPrereqError("Instance '%s' not known" %
3423                                  self.op.instance_name)
3424     self.instance = instance
3425     self.op.instance_name = instance.name
3426
3427     if instance.disk_template not in constants.DTS_NET_MIRROR:
3428       raise errors.OpPrereqError("Instance's disk layout is not"
3429                                  " network mirrored.")
3430
3431     if len(instance.secondary_nodes) != 1:
3432       raise errors.OpPrereqError("The instance has a strange layout,"
3433                                  " expected one secondary but found %d" %
3434                                  len(instance.secondary_nodes))
3435
3436     self.sec_node = instance.secondary_nodes[0]
3437
3438     remote_node = getattr(self.op, "remote_node", None)
3439     if remote_node is not None:
3440       remote_node = self.cfg.ExpandNodeName(remote_node)
3441       if remote_node is None:
3442         raise errors.OpPrereqError("Node '%s' not known" %
3443                                    self.op.remote_node)
3444       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3445     else:
3446       self.remote_node_info = None
3447     if remote_node == instance.primary_node:
3448       raise errors.OpPrereqError("The specified node is the primary node of"
3449                                  " the instance.")
3450     elif remote_node == self.sec_node:
3451       if self.op.mode == constants.REPLACE_DISK_SEC:
3452         # this is for DRBD8, where we can't execute the same mode of
3453         # replacement as for drbd7 (no different port allocated)
3454         raise errors.OpPrereqError("Same secondary given, cannot execute"
3455                                    " replacement")
3456       # the user gave the current secondary, switch to
3457       # 'no-replace-secondary' mode for drbd7
3458       remote_node = None
3459     if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3460         self.op.mode != constants.REPLACE_DISK_ALL):
3461       raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3462                                  " disks replacement, not individual ones")
3463     if instance.disk_template == constants.DT_DRBD8:
3464       if (self.op.mode == constants.REPLACE_DISK_ALL and
3465           remote_node is not None):
3466         # switch to replace secondary mode
3467         self.op.mode = constants.REPLACE_DISK_SEC
3468
3469       if self.op.mode == constants.REPLACE_DISK_ALL:
3470         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3471                                    " secondary disk replacement, not"
3472                                    " both at once")
3473       elif self.op.mode == constants.REPLACE_DISK_PRI:
3474         if remote_node is not None:
3475           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3476                                      " the secondary while doing a primary"
3477                                      " node disk replacement")
3478         self.tgt_node = instance.primary_node
3479         self.oth_node = instance.secondary_nodes[0]
3480       elif self.op.mode == constants.REPLACE_DISK_SEC:
3481         self.new_node = remote_node # this can be None, in which case
3482                                     # we don't change the secondary
3483         self.tgt_node = instance.secondary_nodes[0]
3484         self.oth_node = instance.primary_node
3485       else:
3486         raise errors.ProgrammerError("Unhandled disk replace mode")
3487
3488     for name in self.op.disks:
3489       if instance.FindDisk(name) is None:
3490         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3491                                    (name, instance.name))
3492     self.op.remote_node = remote_node
3493
3494   def _ExecRR1(self, feedback_fn):
3495     """Replace the disks of an instance.
3496
3497     """
3498     instance = self.instance
3499     iv_names = {}
3500     # start of work
3501     if self.op.remote_node is None:
3502       remote_node = self.sec_node
3503     else:
3504       remote_node = self.op.remote_node
3505     cfg = self.cfg
3506     for dev in instance.disks:
3507       size = dev.size
3508       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3509       names = _GenerateUniqueNames(cfg, lv_names)
3510       new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3511                                        remote_node, size, names)
3512       iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3513       logger.Info("adding new mirror component on secondary for %s" %
3514                   dev.iv_name)
3515       #HARDCODE
3516       if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3517                                         new_drbd, False,
3518                                         _GetInstanceInfoText(instance)):
3519         raise errors.OpExecError("Failed to create new component on secondary"
3520                                  " node %s. Full abort, cleanup manually!" %
3521                                  remote_node)
3522
3523       logger.Info("adding new mirror component on primary")
3524       #HARDCODE
3525       if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3526                                       instance, new_drbd,
3527                                       _GetInstanceInfoText(instance)):
3528         # remove secondary dev
3529         cfg.SetDiskID(new_drbd, remote_node)
3530         rpc.call_blockdev_remove(remote_node, new_drbd)
3531         raise errors.OpExecError("Failed to create volume on primary!"
3532                                  " Full abort, cleanup manually!!")
3533
3534       # the device exists now
3535       # call the primary node to add the mirror to md
3536       logger.Info("adding new mirror component to md")
3537       if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3538                                            [new_drbd]):
3539         logger.Error("Can't add mirror compoment to md!")
3540         cfg.SetDiskID(new_drbd, remote_node)
3541         if not rpc.call_blockdev_remove(remote_node, new_drbd):
3542           logger.Error("Can't rollback on secondary")
3543         cfg.SetDiskID(new_drbd, instance.primary_node)
3544         if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3545           logger.Error("Can't rollback on primary")
3546         raise errors.OpExecError("Full abort, cleanup manually!!")
3547
3548       dev.children.append(new_drbd)
3549       cfg.AddInstance(instance)
3550
3551     # this can fail as the old devices are degraded and _WaitForSync
3552     # does a combined result over all disks, so we don't check its
3553     # return value
3554     _WaitForSync(cfg, instance, self.proc, unlock=True)
3555
3556     # so check manually all the devices
3557     for name in iv_names:
3558       dev, child, new_drbd = iv_names[name]
3559       cfg.SetDiskID(dev, instance.primary_node)
3560       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3561       if is_degr:
3562         raise errors.OpExecError("MD device %s is degraded!" % name)
3563       cfg.SetDiskID(new_drbd, instance.primary_node)
3564       is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3565       if is_degr:
3566         raise errors.OpExecError("New drbd device %s is degraded!" % name)
3567
3568     for name in iv_names:
3569       dev, child, new_drbd = iv_names[name]
3570       logger.Info("remove mirror %s component" % name)
3571       cfg.SetDiskID(dev, instance.primary_node)
3572       if not rpc.call_blockdev_removechildren(instance.primary_node,
3573                                               dev, [child]):
3574         logger.Error("Can't remove child from mirror, aborting"
3575                      " *this device cleanup*.\nYou need to cleanup manually!!")
3576         continue
3577
3578       for node in child.logical_id[:2]:
3579         logger.Info("remove child device on %s" % node)
3580         cfg.SetDiskID(child, node)
3581         if not rpc.call_blockdev_remove(node, child):
3582           logger.Error("Warning: failed to remove device from node %s,"
3583                        " continuing operation." % node)
3584
3585       dev.children.remove(child)
3586
3587       cfg.AddInstance(instance)
3588
3589   def _ExecD8DiskOnly(self, feedback_fn):
3590     """Replace a disk on the primary or secondary for dbrd8.
3591
3592     The algorithm for replace is quite complicated:
3593       - for each disk to be replaced:
3594         - create new LVs on the target node with unique names
3595         - detach old LVs from the drbd device
3596         - rename old LVs to name_replaced.<time_t>
3597         - rename new LVs to old LVs
3598         - attach the new LVs (with the old names now) to the drbd device
3599       - wait for sync across all devices
3600       - for each modified disk:
3601         - remove old LVs (which have the name name_replaces.<time_t>)
3602
3603     Failures are not very well handled.
3604
3605     """
3606     steps_total = 6
3607     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3608     instance = self.instance
3609     iv_names = {}
3610     vgname = self.cfg.GetVGName()
3611     # start of work
3612     cfg = self.cfg
3613     tgt_node = self.tgt_node
3614     oth_node = self.oth_node
3615
3616     # Step: check device activation
3617     self.proc.LogStep(1, steps_total, "check device existence")
3618     info("checking volume groups")
3619     my_vg = cfg.GetVGName()
3620     results = rpc.call_vg_list([oth_node, tgt_node])
3621     if not results:
3622       raise errors.OpExecError("Can't list volume groups on the nodes")
3623     for node in oth_node, tgt_node:
3624       res = results.get(node, False)
3625       if not res or my_vg not in res:
3626         raise errors.OpExecError("Volume group '%s' not found on %s" %
3627                                  (my_vg, node))
3628     for dev in instance.disks:
3629       if not dev.iv_name in self.op.disks:
3630         continue
3631       for node in tgt_node, oth_node:
3632         info("checking %s on %s" % (dev.iv_name, node))
3633         cfg.SetDiskID(dev, node)
3634         if not rpc.call_blockdev_find(node, dev):
3635           raise errors.OpExecError("Can't find device %s on node %s" %
3636                                    (dev.iv_name, node))
3637
3638     # Step: check other node consistency
3639     self.proc.LogStep(2, steps_total, "check peer consistency")
3640     for dev in instance.disks:
3641       if not dev.iv_name in self.op.disks:
3642         continue
3643       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3644       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3645                                    oth_node==instance.primary_node):
3646         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3647                                  " to replace disks on this node (%s)" %
3648                                  (oth_node, tgt_node))
3649
3650     # Step: create new storage
3651     self.proc.LogStep(3, steps_total, "allocate new storage")
3652     for dev in instance.disks:
3653       if not dev.iv_name in self.op.disks:
3654         continue
3655       size = dev.size
3656       cfg.SetDiskID(dev, tgt_node)
3657       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3658       names = _GenerateUniqueNames(cfg, lv_names)
3659       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3660                              logical_id=(vgname, names[0]))
3661       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3662                              logical_id=(vgname, names[1]))
3663       new_lvs = [lv_data, lv_meta]
3664       old_lvs = dev.children
3665       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3666       info("creating new local storage on %s for %s" %
3667            (tgt_node, dev.iv_name))
3668       # since we *always* want to create this LV, we use the
3669       # _Create...OnPrimary (which forces the creation), even if we
3670       # are talking about the secondary node
3671       for new_lv in new_lvs:
3672         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3673                                         _GetInstanceInfoText(instance)):
3674           raise errors.OpExecError("Failed to create new LV named '%s' on"
3675                                    " node '%s'" %
3676                                    (new_lv.logical_id[1], tgt_node))
3677
3678     # Step: for each lv, detach+rename*2+attach
3679     self.proc.LogStep(4, steps_total, "change drbd configuration")
3680     for dev, old_lvs, new_lvs in iv_names.itervalues():
3681       info("detaching %s drbd from local storage" % dev.iv_name)
3682       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3683         raise errors.OpExecError("Can't detach drbd from local storage on node"
3684                                  " %s for device %s" % (tgt_node, dev.iv_name))
3685       #dev.children = []
3686       #cfg.Update(instance)
3687
3688       # ok, we created the new LVs, so now we know we have the needed
3689       # storage; as such, we proceed on the target node to rename
3690       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3691       # using the assumption than logical_id == physical_id (which in
3692       # turn is the unique_id on that node)
3693
3694       # FIXME(iustin): use a better name for the replaced LVs
3695       temp_suffix = int(time.time())
3696       ren_fn = lambda d, suff: (d.physical_id[0],
3697                                 d.physical_id[1] + "_replaced-%s" % suff)
3698       # build the rename list based on what LVs exist on the node
3699       rlist = []
3700       for to_ren in old_lvs:
3701         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3702         if find_res is not None: # device exists
3703           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3704
3705       info("renaming the old LVs on the target node")
3706       if not rpc.call_blockdev_rename(tgt_node, rlist):
3707         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3708       # now we rename the new LVs to the old LVs
3709       info("renaming the new LVs on the target node")
3710       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3711       if not rpc.call_blockdev_rename(tgt_node, rlist):
3712         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3713
3714       for old, new in zip(old_lvs, new_lvs):
3715         new.logical_id = old.logical_id
3716         cfg.SetDiskID(new, tgt_node)
3717
3718       for disk in old_lvs:
3719         disk.logical_id = ren_fn(disk, temp_suffix)
3720         cfg.SetDiskID(disk, tgt_node)
3721
3722       # now that the new lvs have the old name, we can add them to the device
3723       info("adding new mirror component on %s" % tgt_node)
3724       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3725         for new_lv in new_lvs:
3726           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3727             warning("Can't rollback device %s", hint="manually cleanup unused"
3728                     " logical volumes")
3729         raise errors.OpExecError("Can't add local storage to drbd")
3730
3731       dev.children = new_lvs
3732       cfg.Update(instance)
3733
3734     # Step: wait for sync
3735
3736     # this can fail as the old devices are degraded and _WaitForSync
3737     # does a combined result over all disks, so we don't check its
3738     # return value
3739     self.proc.LogStep(5, steps_total, "sync devices")
3740     _WaitForSync(cfg, instance, self.proc, unlock=True)
3741
3742     # so check manually all the devices
3743     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3744       cfg.SetDiskID(dev, instance.primary_node)
3745       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3746       if is_degr:
3747         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3748
3749     # Step: remove old storage
3750     self.proc.LogStep(6, steps_total, "removing old storage")
3751     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3752       info("remove logical volumes for %s" % name)
3753       for lv in old_lvs:
3754         cfg.SetDiskID(lv, tgt_node)
3755         if not rpc.call_blockdev_remove(tgt_node, lv):
3756           warning("Can't remove old LV", hint="manually remove unused LVs")
3757           continue
3758
3759   def _ExecD8Secondary(self, feedback_fn):
3760     """Replace the secondary node for drbd8.
3761
3762     The algorithm for replace is quite complicated:
3763       - for all disks of the instance:
3764         - create new LVs on the new node with same names
3765         - shutdown the drbd device on the old secondary
3766         - disconnect the drbd network on the primary
3767         - create the drbd device on the new secondary
3768         - network attach the drbd on the primary, using an artifice:
3769           the drbd code for Attach() will connect to the network if it
3770           finds a device which is connected to the good local disks but
3771           not network enabled
3772       - wait for sync across all devices
3773       - remove all disks from the old secondary
3774
3775     Failures are not very well handled.
3776
3777     """
3778     steps_total = 6
3779     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3780     instance = self.instance
3781     iv_names = {}
3782     vgname = self.cfg.GetVGName()
3783     # start of work
3784     cfg = self.cfg
3785     old_node = self.tgt_node
3786     new_node = self.new_node
3787     pri_node = instance.primary_node
3788
3789     # Step: check device activation
3790     self.proc.LogStep(1, steps_total, "check device existence")
3791     info("checking volume groups")
3792     my_vg = cfg.GetVGName()
3793     results = rpc.call_vg_list([pri_node, new_node])
3794     if not results:
3795       raise errors.OpExecError("Can't list volume groups on the nodes")
3796     for node in pri_node, new_node:
3797       res = results.get(node, False)
3798       if not res or my_vg not in res:
3799         raise errors.OpExecError("Volume group '%s' not found on %s" %
3800                                  (my_vg, node))
3801     for dev in instance.disks:
3802       if not dev.iv_name in self.op.disks:
3803         continue
3804       info("checking %s on %s" % (dev.iv_name, pri_node))
3805       cfg.SetDiskID(dev, pri_node)
3806       if not rpc.call_blockdev_find(pri_node, dev):
3807         raise errors.OpExecError("Can't find device %s on node %s" %
3808                                  (dev.iv_name, pri_node))
3809
3810     # Step: check other node consistency
3811     self.proc.LogStep(2, steps_total, "check peer consistency")
3812     for dev in instance.disks:
3813       if not dev.iv_name in self.op.disks:
3814         continue
3815       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3816       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3817         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3818                                  " unsafe to replace the secondary" %
3819                                  pri_node)
3820
3821     # Step: create new storage
3822     self.proc.LogStep(3, steps_total, "allocate new storage")
3823     for dev in instance.disks:
3824       size = dev.size
3825       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3826       # since we *always* want to create this LV, we use the
3827       # _Create...OnPrimary (which forces the creation), even if we
3828       # are talking about the secondary node
3829       for new_lv in dev.children:
3830         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3831                                         _GetInstanceInfoText(instance)):
3832           raise errors.OpExecError("Failed to create new LV named '%s' on"
3833                                    " node '%s'" %
3834                                    (new_lv.logical_id[1], new_node))
3835
3836       iv_names[dev.iv_name] = (dev, dev.children)
3837
3838     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3839     for dev in instance.disks:
3840       size = dev.size
3841       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3842       # create new devices on new_node
3843       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3844                               logical_id=(pri_node, new_node,
3845                                           dev.logical_id[2]),
3846                               children=dev.children)
3847       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3848                                         new_drbd, False,
3849                                       _GetInstanceInfoText(instance)):
3850         raise errors.OpExecError("Failed to create new DRBD on"
3851                                  " node '%s'" % new_node)
3852
3853     for dev in instance.disks:
3854       # we have new devices, shutdown the drbd on the old secondary
3855       info("shutting down drbd for %s on old node" % dev.iv_name)
3856       cfg.SetDiskID(dev, old_node)
3857       if not rpc.call_blockdev_shutdown(old_node, dev):
3858         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3859                 hint="Please cleanup this device manually as soon as possible")
3860
3861     info("detaching primary drbds from the network (=> standalone)")
3862     done = 0
3863     for dev in instance.disks:
3864       cfg.SetDiskID(dev, pri_node)
3865       # set the physical (unique in bdev terms) id to None, meaning
3866       # detach from network
3867       dev.physical_id = (None,) * len(dev.physical_id)
3868       # and 'find' the device, which will 'fix' it to match the
3869       # standalone state
3870       if rpc.call_blockdev_find(pri_node, dev):
3871         done += 1
3872       else:
3873         warning("Failed to detach drbd %s from network, unusual case" %
3874                 dev.iv_name)
3875
3876     if not done:
3877       # no detaches succeeded (very unlikely)
3878       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3879
3880     # if we managed to detach at least one, we update all the disks of
3881     # the instance to point to the new secondary
3882     info("updating instance configuration")
3883     for dev in instance.disks:
3884       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3885       cfg.SetDiskID(dev, pri_node)
3886     cfg.Update(instance)
3887
3888     # and now perform the drbd attach
3889     info("attaching primary drbds to new secondary (standalone => connected)")
3890     failures = []
3891     for dev in instance.disks:
3892       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3893       # since the attach is smart, it's enough to 'find' the device,
3894       # it will automatically activate the network, if the physical_id
3895       # is correct
3896       cfg.SetDiskID(dev, pri_node)
3897       if not rpc.call_blockdev_find(pri_node, dev):
3898         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3899                 "please do a gnt-instance info to see the status of disks")
3900
3901     # this can fail as the old devices are degraded and _WaitForSync
3902     # does a combined result over all disks, so we don't check its
3903     # return value
3904     self.proc.LogStep(5, steps_total, "sync devices")
3905     _WaitForSync(cfg, instance, self.proc, unlock=True)
3906
3907     # so check manually all the devices
3908     for name, (dev, old_lvs) in iv_names.iteritems():
3909       cfg.SetDiskID(dev, pri_node)
3910       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3911       if is_degr:
3912         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3913
3914     self.proc.LogStep(6, steps_total, "removing old storage")
3915     for name, (dev, old_lvs) in iv_names.iteritems():
3916       info("remove logical volumes for %s" % name)
3917       for lv in old_lvs:
3918         cfg.SetDiskID(lv, old_node)
3919         if not rpc.call_blockdev_remove(old_node, lv):
3920           warning("Can't remove LV on old secondary",
3921                   hint="Cleanup stale volumes by hand")
3922
3923   def Exec(self, feedback_fn):
3924     """Execute disk replacement.
3925
3926     This dispatches the disk replacement to the appropriate handler.
3927
3928     """
3929     instance = self.instance
3930     if instance.disk_template == constants.DT_REMOTE_RAID1:
3931       fn = self._ExecRR1
3932     elif instance.disk_template == constants.DT_DRBD8:
3933       if self.op.remote_node is None:
3934         fn = self._ExecD8DiskOnly
3935       else:
3936         fn = self._ExecD8Secondary
3937     else:
3938       raise errors.ProgrammerError("Unhandled disk replacement case")
3939     return fn(feedback_fn)
3940
3941
3942 class LUQueryInstanceData(NoHooksLU):
3943   """Query runtime instance data.
3944
3945   """
3946   _OP_REQP = ["instances"]
3947
3948   def CheckPrereq(self):
3949     """Check prerequisites.
3950
3951     This only checks the optional instance list against the existing names.
3952
3953     """
3954     if not isinstance(self.op.instances, list):
3955       raise errors.OpPrereqError("Invalid argument type 'instances'")
3956     if self.op.instances:
3957       self.wanted_instances = []
3958       names = self.op.instances
3959       for name in names:
3960         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3961         if instance is None:
3962           raise errors.OpPrereqError("No such instance name '%s'" % name)
3963       self.wanted_instances.append(instance)
3964     else:
3965       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3966                                in self.cfg.GetInstanceList()]
3967     return
3968
3969
3970   def _ComputeDiskStatus(self, instance, snode, dev):
3971     """Compute block device status.
3972
3973     """
3974     self.cfg.SetDiskID(dev, instance.primary_node)
3975     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3976     if dev.dev_type in constants.LDS_DRBD:
3977       # we change the snode then (otherwise we use the one passed in)
3978       if dev.logical_id[0] == instance.primary_node:
3979         snode = dev.logical_id[1]
3980       else:
3981         snode = dev.logical_id[0]
3982
3983     if snode:
3984       self.cfg.SetDiskID(dev, snode)
3985       dev_sstatus = rpc.call_blockdev_find(snode, dev)
3986     else:
3987       dev_sstatus = None
3988
3989     if dev.children:
3990       dev_children = [self._ComputeDiskStatus(instance, snode, child)
3991                       for child in dev.children]
3992     else:
3993       dev_children = []
3994
3995     data = {
3996       "iv_name": dev.iv_name,
3997       "dev_type": dev.dev_type,
3998       "logical_id": dev.logical_id,
3999       "physical_id": dev.physical_id,
4000       "pstatus": dev_pstatus,
4001       "sstatus": dev_sstatus,
4002       "children": dev_children,
4003       }
4004
4005     return data
4006
4007   def Exec(self, feedback_fn):
4008     """Gather and return data"""
4009     result = {}
4010     for instance in self.wanted_instances:
4011       remote_info = rpc.call_instance_info(instance.primary_node,
4012                                                 instance.name)
4013       if remote_info and "state" in remote_info:
4014         remote_state = "up"
4015       else:
4016         remote_state = "down"
4017       if instance.status == "down":
4018         config_state = "down"
4019       else:
4020         config_state = "up"
4021
4022       disks = [self._ComputeDiskStatus(instance, None, device)
4023                for device in instance.disks]
4024
4025       idict = {
4026         "name": instance.name,
4027         "config_state": config_state,
4028         "run_state": remote_state,
4029         "pnode": instance.primary_node,
4030         "snodes": instance.secondary_nodes,
4031         "os": instance.os,
4032         "memory": instance.memory,
4033         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4034         "disks": disks,
4035         "network_port": instance.network_port,
4036         "vcpus": instance.vcpus,
4037         }
4038
4039       result[instance.name] = idict
4040
4041     return result
4042
4043
4044 class LUSetInstanceParms(LogicalUnit):
4045   """Modifies an instances's parameters.
4046
4047   """
4048   HPATH = "instance-modify"
4049   HTYPE = constants.HTYPE_INSTANCE
4050   _OP_REQP = ["instance_name"]
4051
4052   def BuildHooksEnv(self):
4053     """Build hooks env.
4054
4055     This runs on the master, primary and secondaries.
4056
4057     """
4058     args = dict()
4059     if self.mem:
4060       args['memory'] = self.mem
4061     if self.vcpus:
4062       args['vcpus'] = self.vcpus
4063     if self.do_ip or self.do_bridge:
4064       if self.do_ip:
4065         ip = self.ip
4066       else:
4067         ip = self.instance.nics[0].ip
4068       if self.bridge:
4069         bridge = self.bridge
4070       else:
4071         bridge = self.instance.nics[0].bridge
4072       args['nics'] = [(ip, bridge)]
4073     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4074     nl = [self.sstore.GetMasterNode(),
4075           self.instance.primary_node] + list(self.instance.secondary_nodes)
4076     return env, nl, nl
4077
4078   def CheckPrereq(self):
4079     """Check prerequisites.
4080
4081     This only checks the instance list against the existing names.
4082
4083     """
4084     self.mem = getattr(self.op, "mem", None)
4085     self.vcpus = getattr(self.op, "vcpus", None)
4086     self.ip = getattr(self.op, "ip", None)
4087     self.mac = getattr(self.op, "mac", None)
4088     self.bridge = getattr(self.op, "bridge", None)
4089     if [self.mem, self.vcpus, self.ip, self.bridge, self.mac].count(None) == 5:
4090       raise errors.OpPrereqError("No changes submitted")
4091     if self.mem is not None:
4092       try:
4093         self.mem = int(self.mem)
4094       except ValueError, err:
4095         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4096     if self.vcpus is not None:
4097       try:
4098         self.vcpus = int(self.vcpus)
4099       except ValueError, err:
4100         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4101     if self.ip is not None:
4102       self.do_ip = True
4103       if self.ip.lower() == "none":
4104         self.ip = None
4105       else:
4106         if not utils.IsValidIP(self.ip):
4107           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4108     else:
4109       self.do_ip = False
4110     self.do_bridge = (self.bridge is not None)
4111     if self.mac is not None:
4112       if self.cfg.IsMacInUse(self.mac):
4113         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4114                                    self.mac)
4115       if not utils.IsValidMac(self.mac):
4116         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4117
4118     instance = self.cfg.GetInstanceInfo(
4119       self.cfg.ExpandInstanceName(self.op.instance_name))
4120     if instance is None:
4121       raise errors.OpPrereqError("No such instance name '%s'" %
4122                                  self.op.instance_name)
4123     self.op.instance_name = instance.name
4124     self.instance = instance
4125     return
4126
4127   def Exec(self, feedback_fn):
4128     """Modifies an instance.
4129
4130     All parameters take effect only at the next restart of the instance.
4131     """
4132     result = []
4133     instance = self.instance
4134     if self.mem:
4135       instance.memory = self.mem
4136       result.append(("mem", self.mem))
4137     if self.vcpus:
4138       instance.vcpus = self.vcpus
4139       result.append(("vcpus",  self.vcpus))
4140     if self.do_ip:
4141       instance.nics[0].ip = self.ip
4142       result.append(("ip", self.ip))
4143     if self.bridge:
4144       instance.nics[0].bridge = self.bridge
4145       result.append(("bridge", self.bridge))
4146     if self.mac:
4147       instance.nics[0].mac = self.mac
4148       result.append(("mac", self.mac))
4149
4150     self.cfg.AddInstance(instance)
4151
4152     return result
4153
4154
4155 class LUQueryExports(NoHooksLU):
4156   """Query the exports list
4157
4158   """
4159   _OP_REQP = []
4160
4161   def CheckPrereq(self):
4162     """Check that the nodelist contains only existing nodes.
4163
4164     """
4165     self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4166
4167   def Exec(self, feedback_fn):
4168     """Compute the list of all the exported system images.
4169
4170     Returns:
4171       a dictionary with the structure node->(export-list)
4172       where export-list is a list of the instances exported on
4173       that node.
4174
4175     """
4176     return rpc.call_export_list(self.nodes)
4177
4178
4179 class LUExportInstance(LogicalUnit):
4180   """Export an instance to an image in the cluster.
4181
4182   """
4183   HPATH = "instance-export"
4184   HTYPE = constants.HTYPE_INSTANCE
4185   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4186
4187   def BuildHooksEnv(self):
4188     """Build hooks env.
4189
4190     This will run on the master, primary node and target node.
4191
4192     """
4193     env = {
4194       "EXPORT_NODE": self.op.target_node,
4195       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4196       }
4197     env.update(_BuildInstanceHookEnvByObject(self.instance))
4198     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4199           self.op.target_node]
4200     return env, nl, nl
4201
4202   def CheckPrereq(self):
4203     """Check prerequisites.
4204
4205     This checks that the instance name is a valid one.
4206
4207     """
4208     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4209     self.instance = self.cfg.GetInstanceInfo(instance_name)
4210     if self.instance is None:
4211       raise errors.OpPrereqError("Instance '%s' not found" %
4212                                  self.op.instance_name)
4213
4214     # node verification
4215     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4216     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4217
4218     if self.dst_node is None:
4219       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4220                                  self.op.target_node)
4221     self.op.target_node = self.dst_node.name
4222
4223   def Exec(self, feedback_fn):
4224     """Export an instance to an image in the cluster.
4225
4226     """
4227     instance = self.instance
4228     dst_node = self.dst_node
4229     src_node = instance.primary_node
4230     # shutdown the instance, unless requested not to do so
4231     if self.op.shutdown:
4232       op = opcodes.OpShutdownInstance(instance_name=instance.name)
4233       self.proc.ChainOpCode(op)
4234
4235     vgname = self.cfg.GetVGName()
4236
4237     snap_disks = []
4238
4239     try:
4240       for disk in instance.disks:
4241         if disk.iv_name == "sda":
4242           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4243           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4244
4245           if not new_dev_name:
4246             logger.Error("could not snapshot block device %s on node %s" %
4247                          (disk.logical_id[1], src_node))
4248           else:
4249             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4250                                       logical_id=(vgname, new_dev_name),
4251                                       physical_id=(vgname, new_dev_name),
4252                                       iv_name=disk.iv_name)
4253             snap_disks.append(new_dev)
4254
4255     finally:
4256       if self.op.shutdown:
4257         op = opcodes.OpStartupInstance(instance_name=instance.name,
4258                                        force=False)
4259         self.proc.ChainOpCode(op)
4260
4261     # TODO: check for size
4262
4263     for dev in snap_disks:
4264       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4265                                            instance):
4266         logger.Error("could not export block device %s from node"
4267                      " %s to node %s" %
4268                      (dev.logical_id[1], src_node, dst_node.name))
4269       if not rpc.call_blockdev_remove(src_node, dev):
4270         logger.Error("could not remove snapshot block device %s from"
4271                      " node %s" % (dev.logical_id[1], src_node))
4272
4273     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4274       logger.Error("could not finalize export for instance %s on node %s" %
4275                    (instance.name, dst_node.name))
4276
4277     nodelist = self.cfg.GetNodeList()
4278     nodelist.remove(dst_node.name)
4279
4280     # on one-node clusters nodelist will be empty after the removal
4281     # if we proceed the backup would be removed because OpQueryExports
4282     # substitutes an empty list with the full cluster node list.
4283     if nodelist:
4284       op = opcodes.OpQueryExports(nodes=nodelist)
4285       exportlist = self.proc.ChainOpCode(op)
4286       for node in exportlist:
4287         if instance.name in exportlist[node]:
4288           if not rpc.call_export_remove(node, instance.name):
4289             logger.Error("could not remove older export for instance %s"
4290                          " on node %s" % (instance.name, node))
4291
4292
4293 class TagsLU(NoHooksLU):
4294   """Generic tags LU.
4295
4296   This is an abstract class which is the parent of all the other tags LUs.
4297
4298   """
4299   def CheckPrereq(self):
4300     """Check prerequisites.
4301
4302     """
4303     if self.op.kind == constants.TAG_CLUSTER:
4304       self.target = self.cfg.GetClusterInfo()
4305     elif self.op.kind == constants.TAG_NODE:
4306       name = self.cfg.ExpandNodeName(self.op.name)
4307       if name is None:
4308         raise errors.OpPrereqError("Invalid node name (%s)" %
4309                                    (self.op.name,))
4310       self.op.name = name
4311       self.target = self.cfg.GetNodeInfo(name)
4312     elif self.op.kind == constants.TAG_INSTANCE:
4313       name = self.cfg.ExpandInstanceName(self.op.name)
4314       if name is None:
4315         raise errors.OpPrereqError("Invalid instance name (%s)" %
4316                                    (self.op.name,))
4317       self.op.name = name
4318       self.target = self.cfg.GetInstanceInfo(name)
4319     else:
4320       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4321                                  str(self.op.kind))
4322
4323
4324 class LUGetTags(TagsLU):
4325   """Returns the tags of a given object.
4326
4327   """
4328   _OP_REQP = ["kind", "name"]
4329
4330   def Exec(self, feedback_fn):
4331     """Returns the tag list.
4332
4333     """
4334     return self.target.GetTags()
4335
4336
4337 class LUSearchTags(NoHooksLU):
4338   """Searches the tags for a given pattern.
4339
4340   """
4341   _OP_REQP = ["pattern"]
4342
4343   def CheckPrereq(self):
4344     """Check prerequisites.
4345
4346     This checks the pattern passed for validity by compiling it.
4347
4348     """
4349     try:
4350       self.re = re.compile(self.op.pattern)
4351     except re.error, err:
4352       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4353                                  (self.op.pattern, err))
4354
4355   def Exec(self, feedback_fn):
4356     """Returns the tag list.
4357
4358     """
4359     cfg = self.cfg
4360     tgts = [("/cluster", cfg.GetClusterInfo())]
4361     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4362     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4363     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4364     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4365     results = []
4366     for path, target in tgts:
4367       for tag in target.GetTags():
4368         if self.re.search(tag):
4369           results.append((path, tag))
4370     return results
4371
4372
4373 class LUAddTags(TagsLU):
4374   """Sets a tag on a given object.
4375
4376   """
4377   _OP_REQP = ["kind", "name", "tags"]
4378
4379   def CheckPrereq(self):
4380     """Check prerequisites.
4381
4382     This checks the type and length of the tag name and value.
4383
4384     """
4385     TagsLU.CheckPrereq(self)
4386     for tag in self.op.tags:
4387       objects.TaggableObject.ValidateTag(tag)
4388
4389   def Exec(self, feedback_fn):
4390     """Sets the tag.
4391
4392     """
4393     try:
4394       for tag in self.op.tags:
4395         self.target.AddTag(tag)
4396     except errors.TagError, err:
4397       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4398     try:
4399       self.cfg.Update(self.target)
4400     except errors.ConfigurationError:
4401       raise errors.OpRetryError("There has been a modification to the"
4402                                 " config file and the operation has been"
4403                                 " aborted. Please retry.")
4404
4405
4406 class LUDelTags(TagsLU):
4407   """Delete a list of tags from a given object.
4408
4409   """
4410   _OP_REQP = ["kind", "name", "tags"]
4411
4412   def CheckPrereq(self):
4413     """Check prerequisites.
4414
4415     This checks that we have the given tag.
4416
4417     """
4418     TagsLU.CheckPrereq(self)
4419     for tag in self.op.tags:
4420       objects.TaggableObject.ValidateTag(tag)
4421     del_tags = frozenset(self.op.tags)
4422     cur_tags = self.target.GetTags()
4423     if not del_tags <= cur_tags:
4424       diff_tags = del_tags - cur_tags
4425       diff_names = ["'%s'" % tag for tag in diff_tags]
4426       diff_names.sort()
4427       raise errors.OpPrereqError("Tag(s) %s not found" %
4428                                  (",".join(diff_names)))
4429
4430   def Exec(self, feedback_fn):
4431     """Remove the tag from the object.
4432
4433     """
4434     for tag in self.op.tags:
4435       self.target.RemoveTag(tag)
4436     try:
4437       self.cfg.Update(self.target)
4438     except errors.ConfigurationError:
4439       raise errors.OpRetryError("There has been a modification to the"
4440                                 " config file and the operation has been"
4441                                 " aborted. Please retry.")