4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
46 import ganeti.http.server
52 def _RequireJobQueueLock(fn):
53 """Decorator for job queue manipulating functions.
56 QUEUE_LOCK_TIMEOUT = 10
58 def wrapper(*args, **kwargs):
59 # Locking in exclusive, blocking mode because there could be several
60 # children running at the same time. Waiting up to 10 seconds.
61 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63 return fn(*args, **kwargs)
70 class NodeHttpServer(http.server.HttpServer):
71 """The server implementation.
73 This class holds all methods exposed over the RPC interface.
76 def __init__(self, *args, **kwargs):
77 http.server.HttpServer.__init__(self, *args, **kwargs)
78 self.noded_pid = os.getpid()
80 def HandleRequest(self, req):
84 if req.request_method.upper() != http.HTTP_PUT:
85 raise http.HttpBadRequest()
87 path = req.request_path
88 if path.startswith("/"):
91 method = getattr(self, "perspective_%s" % path, None)
93 raise http.HttpNotFound()
97 return method(req.request_body)
98 except backend.RPCFail, err:
99 # our custom failure exception; str(err) works fine if the
100 # exception was constructed with a single argument, and in
101 # this case, err.message == err.args[0] == str(err)
102 return (False, str(err))
104 logging.exception("Error in RPC call")
106 except errors.QuitGanetiException, err:
107 # Tell parent to quit
108 os.kill(self.noded_pid, signal.SIGTERM)
110 # the new block devices --------------------------
113 def perspective_blockdev_create(params):
114 """Create a block device.
117 bdev_s, size, owner, on_primary, info = params
118 bdev = objects.Disk.FromDict(bdev_s)
120 raise ValueError("can't unserialize data!")
121 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
124 def perspective_blockdev_remove(params):
125 """Remove a block device.
129 bdev = objects.Disk.FromDict(bdev_s)
130 return backend.BlockdevRemove(bdev)
133 def perspective_blockdev_rename(params):
134 """Remove a block device.
137 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
138 return backend.BlockdevRename(devlist)
141 def perspective_blockdev_assemble(params):
142 """Assemble a block device.
145 bdev_s, owner, on_primary = params
146 bdev = objects.Disk.FromDict(bdev_s)
148 raise ValueError("can't unserialize data!")
149 return backend.BlockdevAssemble(bdev, owner, on_primary)
152 def perspective_blockdev_shutdown(params):
153 """Shutdown a block device.
157 bdev = objects.Disk.FromDict(bdev_s)
159 raise ValueError("can't unserialize data!")
160 return backend.BlockdevShutdown(bdev)
163 def perspective_blockdev_addchildren(params):
164 """Add a child to a mirror device.
166 Note: this is only valid for mirror devices. It's the caller's duty
167 to send a correct disk, otherwise we raise an error.
170 bdev_s, ndev_s = params
171 bdev = objects.Disk.FromDict(bdev_s)
172 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
173 if bdev is None or ndevs.count(None) > 0:
174 raise ValueError("can't unserialize data!")
175 return backend.BlockdevAddchildren(bdev, ndevs)
178 def perspective_blockdev_removechildren(params):
179 """Remove a child from a mirror device.
181 This is only valid for mirror devices, of course. It's the callers
182 duty to send a correct disk, otherwise we raise an error.
185 bdev_s, ndev_s = params
186 bdev = objects.Disk.FromDict(bdev_s)
187 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
188 if bdev is None or ndevs.count(None) > 0:
189 raise ValueError("can't unserialize data!")
190 return backend.BlockdevRemovechildren(bdev, ndevs)
193 def perspective_blockdev_getmirrorstatus(params):
194 """Return the mirror status for a list of disks.
197 disks = [objects.Disk.FromDict(dsk_s)
199 return backend.BlockdevGetmirrorstatus(disks)
202 def perspective_blockdev_find(params):
203 """Expose the FindBlockDevice functionality for a disk.
205 This will try to find but not activate a disk.
208 disk = objects.Disk.FromDict(params[0])
209 return backend.BlockdevFind(disk)
212 def perspective_blockdev_snapshot(params):
213 """Create a snapshot device.
215 Note that this is only valid for LVM disks, if we get passed
216 something else we raise an exception. The snapshot device can be
217 remove by calling the generic block device remove call.
220 cfbd = objects.Disk.FromDict(params[0])
221 return backend.BlockdevSnapshot(cfbd)
224 def perspective_blockdev_grow(params):
225 """Grow a stack of devices.
228 cfbd = objects.Disk.FromDict(params[0])
230 return backend.BlockdevGrow(cfbd, amount)
233 def perspective_blockdev_close(params):
234 """Closes the given block devices.
237 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
238 return backend.BlockdevClose(params[0], disks)
240 # blockdev/drbd specific methods ----------
243 def perspective_drbd_disconnect_net(params):
244 """Disconnects the network connection of drbd disks.
246 Note that this is only valid for drbd disks, so the members of the
247 disk list must all be drbd devices.
250 nodes_ip, disks = params
251 disks = [objects.Disk.FromDict(cf) for cf in disks]
252 return backend.DrbdDisconnectNet(nodes_ip, disks)
255 def perspective_drbd_attach_net(params):
256 """Attaches the network connection of drbd disks.
258 Note that this is only valid for drbd disks, so the members of the
259 disk list must all be drbd devices.
262 nodes_ip, disks, instance_name, multimaster = params
263 disks = [objects.Disk.FromDict(cf) for cf in disks]
264 return backend.DrbdAttachNet(nodes_ip, disks,
265 instance_name, multimaster)
268 def perspective_drbd_wait_sync(params):
269 """Wait until DRBD disks are synched.
271 Note that this is only valid for drbd disks, so the members of the
272 disk list must all be drbd devices.
275 nodes_ip, disks = params
276 disks = [objects.Disk.FromDict(cf) for cf in disks]
277 return backend.DrbdWaitSync(nodes_ip, disks)
279 # export/import --------------------------
282 def perspective_snapshot_export(params):
283 """Export a given snapshot.
286 disk = objects.Disk.FromDict(params[0])
287 dest_node = params[1]
288 instance = objects.Instance.FromDict(params[2])
289 cluster_name = params[3]
291 return backend.ExportSnapshot(disk, dest_node, instance,
292 cluster_name, dev_idx)
295 def perspective_finalize_export(params):
296 """Expose the finalize export functionality.
299 instance = objects.Instance.FromDict(params[0])
300 snap_disks = [objects.Disk.FromDict(str_data)
301 for str_data in params[1]]
302 return backend.FinalizeExport(instance, snap_disks)
305 def perspective_export_info(params):
306 """Query information about an existing export on this node.
308 The given path may not contain an export, in which case we return
313 return backend.ExportInfo(path)
316 def perspective_export_list(params):
317 """List the available exports on this node.
319 Note that as opposed to export_info, which may query data about an
320 export in any path, this only queries the standard Ganeti path
321 (constants.EXPORT_DIR).
324 return backend.ListExports()
327 def perspective_export_remove(params):
332 return backend.RemoveExport(export)
334 # volume --------------------------
337 def perspective_volume_list(params):
338 """Query the list of logical volumes in a given volume group.
342 return True, backend.GetVolumeList(vgname)
345 def perspective_vg_list(params):
346 """Query the list of volume groups.
349 return backend.ListVolumeGroups()
351 # bridge --------------------------
354 def perspective_bridges_exist(params):
355 """Check if all bridges given exist on this node.
358 bridges_list = params[0]
359 return backend.BridgesExist(bridges_list)
361 # instance --------------------------
364 def perspective_instance_os_add(params):
365 """Install an OS on a given instance.
369 inst = objects.Instance.FromDict(inst_s)
370 reinstall = params[1]
371 return backend.InstanceOsAdd(inst, reinstall)
374 def perspective_instance_run_rename(params):
375 """Runs the OS rename script for an instance.
378 inst_s, old_name = params
379 inst = objects.Instance.FromDict(inst_s)
380 return backend.RunRenameInstance(inst, old_name)
383 def perspective_instance_os_import(params):
384 """Run the import function of an OS onto a given instance.
387 inst_s, src_node, src_images, cluster_name = params
388 inst = objects.Instance.FromDict(inst_s)
389 return backend.ImportOSIntoInstance(inst, src_node, src_images,
393 def perspective_instance_shutdown(params):
394 """Shutdown an instance.
397 instance = objects.Instance.FromDict(params[0])
398 return backend.InstanceShutdown(instance)
401 def perspective_instance_start(params):
402 """Start an instance.
405 instance = objects.Instance.FromDict(params[0])
406 return backend.StartInstance(instance)
409 def perspective_migration_info(params):
410 """Gather information about an instance to be migrated.
413 instance = objects.Instance.FromDict(params[0])
414 return backend.MigrationInfo(instance)
417 def perspective_accept_instance(params):
418 """Prepare the node to accept an instance.
421 instance, info, target = params
422 instance = objects.Instance.FromDict(instance)
423 return backend.AcceptInstance(instance, info, target)
426 def perspective_finalize_migration(params):
427 """Finalize the instance migration.
430 instance, info, success = params
431 instance = objects.Instance.FromDict(instance)
432 return backend.FinalizeMigration(instance, info, success)
435 def perspective_instance_migrate(params):
436 """Migrates an instance.
439 instance, target, live = params
440 instance = objects.Instance.FromDict(instance)
441 return backend.MigrateInstance(instance, target, live)
444 def perspective_instance_reboot(params):
445 """Reboot an instance.
448 instance = objects.Instance.FromDict(params[0])
449 reboot_type = params[1]
450 return backend.InstanceReboot(instance, reboot_type)
453 def perspective_instance_info(params):
454 """Query instance information.
457 return backend.GetInstanceInfo(params[0], params[1])
460 def perspective_instance_migratable(params):
461 """Query whether the specified instance can be migrated.
464 instance = objects.Instance.FromDict(params[0])
465 return backend.GetInstanceMigratable(instance)
468 def perspective_all_instances_info(params):
469 """Query information about all instances.
472 return backend.GetAllInstancesInfo(params[0])
475 def perspective_instance_list(params):
476 """Query the list of running instances.
479 return True, backend.GetInstanceList(params[0])
481 # node --------------------------
484 def perspective_node_tcp_ping(params):
485 """Do a TcpPing on the remote node.
488 return utils.TcpPing(params[1], params[2], timeout=params[3],
489 live_port_needed=params[4], source=params[0])
492 def perspective_node_has_ip_address(params):
493 """Checks if a node has the given ip address.
496 return True, utils.OwnIpAddress(params[0])
499 def perspective_node_info(params):
500 """Query node information.
503 vgname, hypervisor_type = params
504 return backend.GetNodeInfo(vgname, hypervisor_type)
507 def perspective_node_add(params):
508 """Complete the registration of this node in the cluster.
511 return backend.AddNode(params[0], params[1], params[2],
512 params[3], params[4], params[5])
515 def perspective_node_verify(params):
516 """Run a verify sequence on this node.
519 return backend.VerifyNode(params[0], params[1])
522 def perspective_node_start_master(params):
523 """Promote this node to master status.
526 return backend.StartMaster(params[0])
529 def perspective_node_stop_master(params):
530 """Demote this node from master status.
533 return backend.StopMaster(params[0])
536 def perspective_node_leave_cluster(params):
537 """Cleanup after leaving a cluster.
540 return backend.LeaveCluster()
543 def perspective_node_volumes(params):
544 """Query the list of all logical volume groups.
547 return backend.NodeVolumes()
550 def perspective_node_demote_from_mc(params):
551 """Demote a node from the master candidate role.
554 return backend.DemoteFromMC()
558 def perspective_node_powercycle(params):
559 """Tries to powercycle the nod.
562 hypervisor_type = params[0]
563 return backend.PowercycleNode(hypervisor_type)
566 # cluster --------------------------
569 def perspective_version(params):
570 """Query version information.
573 return constants.PROTOCOL_VERSION
576 def perspective_upload_file(params):
579 Note that the backend implementation imposes strict rules on which
583 return backend.UploadFile(*params)
586 def perspective_master_info(params):
587 """Query master information.
590 return backend.GetMasterInfo()
593 def perspective_write_ssconf_files(params):
594 """Write ssconf files.
598 return backend.WriteSsconfFiles(values)
600 # os -----------------------
603 def perspective_os_diagnose(params):
604 """Query detailed information about existing OSes.
607 return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
610 def perspective_os_get(params):
611 """Query information about a given OS.
616 os_obj = backend.OSFromDisk(name)
617 except errors.InvalidOS, err:
618 os_obj = objects.OS.FromInvalidOS(err)
619 return os_obj.ToDict()
621 # hooks -----------------------
624 def perspective_hooks_runner(params):
628 hpath, phase, env = params
629 hr = backend.HooksRunner()
630 return hr.RunHooks(hpath, phase, env)
632 # iallocator -----------------
635 def perspective_iallocator_runner(params):
636 """Run an iallocator script.
640 iar = backend.IAllocatorRunner()
641 return iar.Run(name, idata)
643 # test -----------------------
646 def perspective_test_delay(params):
651 return utils.TestDelay(duration)
653 # file storage ---------------
656 def perspective_file_storage_dir_create(params):
657 """Create the file storage directory.
660 file_storage_dir = params[0]
661 return backend.CreateFileStorageDir(file_storage_dir)
664 def perspective_file_storage_dir_remove(params):
665 """Remove the file storage directory.
668 file_storage_dir = params[0]
669 return backend.RemoveFileStorageDir(file_storage_dir)
672 def perspective_file_storage_dir_rename(params):
673 """Rename the file storage directory.
676 old_file_storage_dir = params[0]
677 new_file_storage_dir = params[1]
678 return backend.RenameFileStorageDir(old_file_storage_dir,
679 new_file_storage_dir)
681 # jobs ------------------------
684 @_RequireJobQueueLock
685 def perspective_jobqueue_update(params):
689 (file_name, content) = params
690 return backend.JobQueueUpdate(file_name, content)
693 @_RequireJobQueueLock
694 def perspective_jobqueue_purge(params):
698 return backend.JobQueuePurge()
701 @_RequireJobQueueLock
702 def perspective_jobqueue_rename(params):
703 """Rename a job queue file.
706 # TODO: What if a file fails to rename?
707 return [backend.JobQueueRename(old, new) for old, new in params]
710 def perspective_jobqueue_set_drain(params):
711 """Set/unset the queue drain flag.
714 drain_flag = params[0]
715 return backend.JobQueueSetDrainFlag(drain_flag)
718 # hypervisor ---------------
721 def perspective_hypervisor_validate_params(params):
722 """Validate the hypervisor parameters.
725 (hvname, hvparams) = params
726 return backend.ValidateHVParams(hvname, hvparams)
730 """Parse the command line options.
732 @return: (options, args) as from OptionParser.parse_args()
735 parser = OptionParser(description="Ganeti node daemon",
736 usage="%prog [-f] [-d] [-b ADDRESS]",
737 version="%%prog (ganeti) %s" %
738 constants.RELEASE_VERSION)
740 parser.add_option("-f", "--foreground", dest="fork",
741 help="Don't detach from the current terminal",
742 default=True, action="store_false")
743 parser.add_option("-d", "--debug", dest="debug",
744 help="Enable some debug messages",
745 default=False, action="store_true")
746 parser.add_option("-b", "--bind", dest="bind_address",
748 default="", metavar="ADDRESS")
750 options, args = parser.parse_args()
755 """Main function for the node daemon.
760 options, args = ParseOptions()
761 utils.debug = options.debug
766 for fname in (constants.SSL_CERT_FILE,):
767 if not os.path.isfile(fname):
768 print "config %s not there, will not run." % fname
772 port = utils.GetNodeDaemonPort()
773 except errors.ConfigurationError, err:
774 print "Cluster configuration incomplete: '%s'" % str(err)
777 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
778 dirs.append((constants.LOG_OS_DIR, 0750))
779 dirs.append((constants.LOCK_DIR, 1777))
780 utils.EnsureDirs(dirs)
784 utils.Daemonize(logfile=constants.LOG_NODESERVER)
786 utils.WritePidFile(constants.NODED_PID)
788 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
789 stderr_logging=not options.fork)
790 logging.info("ganeti node daemon startup")
792 # Read SSL certificate
793 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
794 ssl_cert_path=constants.SSL_CERT_FILE)
797 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
799 mainloop = daemon.Mainloop()
800 server = NodeHttpServer(mainloop, options.bind_address, port,
801 ssl_params=ssl_params, ssl_verify_peer=True)
808 utils.RemovePidFile(constants.NODED_PID)
811 if __name__ == '__main__':