4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script to show add a new node to the cluster
26 # pylint: disable-msg=C0103
30 from twisted.internet.pollreactor import PollReactor
32 class ReReactor(PollReactor):
33 """A re-startable Reactor implementation.
36 def run(self, installSignalHandlers=1):
39 This is customized run that, before calling Reactor.run, will
40 reinstall the shutdown events and re-create the threadpool in case
41 these are not present (as will happen on the second run of the
45 if not 'shutdown' in self._eventTriggers:
46 # the shutdown queue has been killed, we are most probably
47 # at the second run, thus recreate the queue
48 self.addSystemEventTrigger('during', 'shutdown', self.crash)
49 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50 if self.threadpool is not None and self.threadpool.joined == 1:
51 # in case the threadpool has been stopped, re-start it
52 # and add a trigger to stop it at reactor shutdown
53 self.threadpool.start()
54 self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
56 return PollReactor.run(self, installSignalHandlers)
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
75 """Node-handling class.
77 For each node that we speak with, we create an instance of this
78 class, so that we have a safe place to store the details of this
82 def __init__(self, parent, node):
87 """Stop the reactor if we got all the results.
90 if len(self.parent.results) == len(self.parent.nc):
93 def cb_call(self, obj):
94 """Callback for successful connect.
96 If the connect and login sequence succeeded, we proceed with
97 making the actual call.
100 deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101 deferred.addCallbacks(self.cb_done, self.cb_err2)
103 def cb_done(self, result):
104 """Callback for successful call.
106 When we receive the result from a call, we check if it was an
107 error and if so we raise a generic RemoteError (we can't pass yet
108 the actual exception over). If there was no error, we store the
112 tb, self.parent.results[self.node] = result
115 raise errors.RemoteError("Remote procedure error calling %s on %s:"
116 "\n%s" % (self.parent.procedure,
120 def cb_err1(self, reason):
121 """Error callback for unsuccessful connect.
124 logger.Error("caller_connect: could not connect to remote host %s,"
125 " reason %s" % (self.node, reason))
126 self.parent.results[self.node] = False
129 def cb_err2(self, reason):
130 """Error callback for unsuccessful call.
132 This is when the call didn't return anything, not even an error,
133 or when it time out, etc.
136 logger.Error("caller_call: could not call %s on node %s,"
137 " reason %s" % (self.parent.procedure, self.node, reason))
138 self.parent.results[self.node] = False
142 class MirrorContextFactory:
143 """Certificate verifier factory.
145 This factory creates contexts that verify if the remote end has a
146 specific certificate (i.e. our own certificate).
148 The checks we do are that the PEM dump of the certificate is the
149 same as our own and (somewhat redundantly) that the SHA checksum is
157 fd = open(constants.SSL_CERT_FILE, 'r')
159 data = fd.read(16384)
162 except EnvironmentError, err:
163 raise errors.ConfigurationError("missing SSL certificate: %s" %
165 self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166 self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167 self.mydigest = self.mycert.digest('SHA')
169 def verifier(self, conn, x509, errno, err_depth, retcode):
170 """Certificate verify method.
173 if self.mydigest != x509.digest('SHA'):
175 if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
179 def getContext(self):
180 """Context generator.
183 context = SSL.Context(SSL.TLSv1_METHOD)
184 context.set_verify(SSL.VERIFY_PEER, self.verifier)
190 This class, given a (remote) method name, a list of parameters and a
191 list of nodes, will contact (in parallel) all nodes, and return a
192 dict of results (key: node name, value: result).
194 One current bug is that generic failure is still signalled by
195 'False' result, which is not good. This overloading of values can
203 def __init__(self, procedure, args):
204 ss = ssconf.SimpleStore()
205 self.port = ss.GetNodeDaemonPort()
206 self.nodepw = ss.GetNodeDaemonPassword()
209 self.procedure = procedure
212 #--- generic connector -------------
214 def connect_list(self, node_list):
215 """Add a list of nodes to the target nodes.
218 for node in node_list:
221 def connect(self, connect_node):
222 """Add a node to the target list.
225 factory = pb.PBClientFactory()
226 self.nc[connect_node] = nc = NodeController(self, connect_node)
227 reactor.connectSSL(connect_node, self.port, factory,
228 MirrorContextFactory())
229 #d = factory.getRootObject()
230 d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231 d.addCallbacks(nc.cb_call, nc.cb_err1)
234 """Return the results of the call.
240 """Wrapper over reactor.run().
242 This function simply calls reactor.run() if we have any requests
243 queued, otherwise it does nothing.
250 def call_volume_list(node_list, vg_name):
251 """Gets the logical volumes present in a given volume group.
253 This is a multi-node call.
256 c = Client("volume_list", [vg_name])
257 c.connect_list(node_list)
262 def call_vg_list(node_list):
263 """Gets the volume group list.
265 This is a multi-node call.
268 c = Client("vg_list", [])
269 c.connect_list(node_list)
274 def call_bridges_exist(node, bridges_list):
275 """Checks if a node has all the bridges given.
277 This method checks if all bridges given in the bridges_list are
278 present on the remote node, so that an instance that uses interfaces
279 on those bridges can be started.
281 This is a single-node call.
284 c = Client("bridges_exist", [bridges_list])
287 return c.getresult().get(node, False)
290 def call_instance_start(node, instance, extra_args):
291 """Starts an instance.
293 This is a single-node call.
296 c = Client("instance_start", [instance.ToDict(), extra_args])
299 return c.getresult().get(node, False)
302 def call_instance_shutdown(node, instance):
303 """Stops an instance.
305 This is a single-node call.
308 c = Client("instance_shutdown", [instance.ToDict()])
311 return c.getresult().get(node, False)
314 def call_instance_reboot(node, instance, reboot_type, extra_args):
315 """Reboots an instance.
317 This is a single-node call.
320 c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
323 return c.getresult().get(node, False)
326 def call_instance_os_add(node, inst, osdev, swapdev):
327 """Installs an OS on the given instance.
329 This is a single-node call.
332 params = [inst.ToDict(), osdev, swapdev]
333 c = Client("instance_os_add", params)
336 return c.getresult().get(node, False)
339 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
340 """Run the OS rename script for an instance.
342 This is a single-node call.
345 params = [inst.ToDict(), old_name, osdev, swapdev]
346 c = Client("instance_run_rename", params)
349 return c.getresult().get(node, False)
352 def call_instance_info(node, instance):
353 """Returns information about a single instance.
355 This is a single-node call.
358 c = Client("instance_info", [instance])
361 return c.getresult().get(node, False)
364 def call_all_instances_info(node_list):
365 """Returns information about all instances on a given node.
367 This is a single-node call.
370 c = Client("all_instances_info", [])
371 c.connect_list(node_list)
376 def call_instance_list(node_list):
377 """Returns the list of running instances on a given node.
379 This is a single-node call.
382 c = Client("instance_list", [])
383 c.connect_list(node_list)
388 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
389 """Do a TcpPing on the remote node
391 This is a single-node call.
393 c = Client("node_tcp_ping", [source, target, port, timeout,
397 return c.getresult().get(node, False)
400 def call_node_info(node_list, vg_name):
401 """Return node information.
403 This will return memory information and volume group size and free
406 This is a multi-node call.
409 c = Client("node_info", [vg_name])
410 c.connect_list(node_list)
412 retux = c.getresult()
414 for node_name in retux:
415 ret = retux.get(node_name, False)
416 if type(ret) != dict:
417 logger.Error("could not connect to node %s" % (node_name))
421 { 'memory_total' : '-',
424 'vg_size' : 'node_unreachable',
431 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
432 """Add a node to the cluster.
434 This is a single-node call.
437 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
438 c = Client("node_add", params)
441 return c.getresult().get(node, False)
444 def call_node_verify(node_list, checkdict):
445 """Request verification of given parameters.
447 This is a multi-node call.
450 c = Client("node_verify", [checkdict])
451 c.connect_list(node_list)
456 def call_node_start_master(node):
457 """Tells a node to activate itself as a master.
459 This is a single-node call.
462 c = Client("node_start_master", [])
465 return c.getresult().get(node, False)
468 def call_node_stop_master(node):
469 """Tells a node to demote itself from master status.
471 This is a single-node call.
474 c = Client("node_stop_master", [])
477 return c.getresult().get(node, False)
480 def call_version(node_list):
481 """Query node version.
483 This is a multi-node call.
486 c = Client("version", [])
487 c.connect_list(node_list)
492 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
493 """Request creation of a given block device.
495 This is a single-node call.
498 params = [bdev.ToDict(), size, owner, on_primary, info]
499 c = Client("blockdev_create", params)
502 return c.getresult().get(node, False)
505 def call_blockdev_remove(node, bdev):
506 """Request removal of a given block device.
508 This is a single-node call.
511 c = Client("blockdev_remove", [bdev.ToDict()])
514 return c.getresult().get(node, False)
517 def call_blockdev_rename(node, devlist):
518 """Request rename of the given block devices.
520 This is a single-node call.
523 params = [(d.ToDict(), uid) for d, uid in devlist]
524 c = Client("blockdev_rename", params)
527 return c.getresult().get(node, False)
530 def call_blockdev_assemble(node, disk, owner, on_primary):
531 """Request assembling of a given block device.
533 This is a single-node call.
536 params = [disk.ToDict(), owner, on_primary]
537 c = Client("blockdev_assemble", params)
540 return c.getresult().get(node, False)
543 def call_blockdev_shutdown(node, disk):
544 """Request shutdown of a given block device.
546 This is a single-node call.
549 c = Client("blockdev_shutdown", [disk.ToDict()])
552 return c.getresult().get(node, False)
555 def call_blockdev_addchildren(node, bdev, ndevs):
556 """Request adding a list of children to a (mirroring) device.
558 This is a single-node call.
561 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
562 c = Client("blockdev_addchildren", params)
565 return c.getresult().get(node, False)
568 def call_blockdev_removechildren(node, bdev, ndevs):
569 """Request removing a list of children from a (mirroring) device.
571 This is a single-node call.
574 params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
575 c = Client("blockdev_removechildren", params)
578 return c.getresult().get(node, False)
581 def call_blockdev_getmirrorstatus(node, disks):
582 """Request status of a (mirroring) device.
584 This is a single-node call.
587 params = [dsk.ToDict() for dsk in disks]
588 c = Client("blockdev_getmirrorstatus", params)
591 return c.getresult().get(node, False)
594 def call_blockdev_find(node, disk):
595 """Request identification of a given block device.
597 This is a single-node call.
600 c = Client("blockdev_find", [disk.ToDict()])
603 return c.getresult().get(node, False)
606 def call_upload_file(node_list, file_name):
609 The node will refuse the operation in case the file is not on the
612 This is a multi-node call.
620 st = os.stat(file_name)
621 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
622 st.st_atime, st.st_mtime]
623 c = Client("upload_file", params)
624 c.connect_list(node_list)
629 def call_os_diagnose(node_list):
630 """Request a diagnose of OS definitions.
632 This is a multi-node call.
635 c = Client("os_diagnose", [])
636 c.connect_list(node_list)
638 result = c.getresult()
640 for node_name in result:
642 if result[node_name]:
643 for data in result[node_name]:
645 if isinstance(data, dict):
646 nr.append(objects.OS.FromDict(data))
647 elif isinstance(data, tuple) and len(data) == 3:
648 nr.append(errors.InvalidOS(data[0], data[1], data[2]))
650 raise errors.ProgrammerError("Invalid data from"
651 " xcserver.os_diagnose")
652 new_result[node_name] = nr
656 def call_os_get(node, name):
657 """Returns an OS definition.
659 This is a single-node call.
662 c = Client("os_get", [name])
665 result = c.getresult().get(node, False)
667 if isinstance(result, dict):
668 new_result = objects.OS.FromDict(result)
669 elif isinstance(result, tuple) and len(data) == 3:
670 new_result = errors.InvalidOS(result[0], result[1], result[2])
677 def call_hooks_runner(node_list, hpath, phase, env):
678 """Call the hooks runner.
681 - op: the OpCode instance
682 - env: a dictionary with the environment
684 This is a multi-node call.
687 params = [hpath, phase, env]
688 c = Client("hooks_runner", params)
689 c.connect_list(node_list)
691 result = c.getresult()
695 def call_blockdev_snapshot(node, cf_bdev):
696 """Request a snapshot of the given block device.
698 This is a single-node call.
701 c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
704 return c.getresult().get(node, False)
707 def call_snapshot_export(node, snap_bdev, dest_node, instance):
708 """Request the export of a given snapshot.
710 This is a single-node call.
713 params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
714 c = Client("snapshot_export", params)
717 return c.getresult().get(node, False)
720 def call_finalize_export(node, instance, snap_disks):
721 """Request the completion of an export operation.
723 This writes the export config file, etc.
725 This is a single-node call.
729 for disk in snap_disks:
730 flat_disks.append(disk.ToDict())
731 params = [instance.ToDict(), flat_disks]
732 c = Client("finalize_export", params)
735 return c.getresult().get(node, False)
738 def call_export_info(node, path):
739 """Queries the export information in a given path.
741 This is a single-node call.
744 c = Client("export_info", [path])
747 result = c.getresult().get(node, False)
750 return objects.SerializableConfigParser.Loads(result)
753 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
754 """Request the import of a backup into an instance.
756 This is a single-node call.
759 params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
760 c = Client("instance_os_import", params)
763 return c.getresult().get(node, False)
766 def call_export_list(node_list):
767 """Gets the stored exports list.
769 This is a multi-node call.
772 c = Client("export_list", [])
773 c.connect_list(node_list)
775 result = c.getresult()
779 def call_export_remove(node, export):
780 """Requests removal of a given export.
782 This is a single-node call.
785 c = Client("export_remove", [export])
788 return c.getresult().get(node, False)
791 def call_node_leave_cluster(node):
792 """Requests a node to clean the cluster information it has.
794 This will remove the configuration information from the ganeti data
797 This is a single-node call.
800 c = Client("node_leave_cluster", [])
803 return c.getresult().get(node, False)
806 def call_node_volumes(node_list):
807 """Gets all volumes on node(s).
809 This is a multi-node call.
812 c = Client("node_volumes", [])
813 c.connect_list(node_list)