4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import jstore
43 from ganeti import http
44 from ganeti import utils
50 def _RequireJobQueueLock(fn):
51 """Decorator for job queue manipulating functions.
54 QUEUE_LOCK_TIMEOUT = 10
56 def wrapper(*args, **kwargs):
57 # Locking in exclusive, blocking mode because there could be several
58 # children running at the same time. Waiting up to 10 seconds.
59 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61 return fn(*args, **kwargs)
68 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
69 """The server implementation.
71 This class holds all methods exposed over the RPC interface.
74 def HandleRequest(self):
78 if self.command.upper() != "PUT":
79 raise http.HTTPBadRequest()
82 if path.startswith("/"):
85 method = getattr(self, "perspective_%s" % path, None)
87 raise httperror.HTTPNotFound()
91 return method(self.post_data)
93 logging.exception("Error in RPC call")
95 except errors.QuitGanetiException, err:
97 os.kill(self.server.noded_pid, signal.SIGTERM)
99 # the new block devices --------------------------
102 def perspective_blockdev_create(params):
103 """Create a block device.
106 bdev_s, size, owner, on_primary, info = params
107 bdev = objects.Disk.FromDict(bdev_s)
109 raise ValueError("can't unserialize data!")
110 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
113 def perspective_blockdev_remove(params):
114 """Remove a block device.
118 bdev = objects.Disk.FromDict(bdev_s)
119 return backend.RemoveBlockDevice(bdev)
122 def perspective_blockdev_rename(params):
123 """Remove a block device.
126 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
127 return backend.RenameBlockDevices(devlist)
130 def perspective_blockdev_assemble(params):
131 """Assemble a block device.
134 bdev_s, owner, on_primary = params
135 bdev = objects.Disk.FromDict(bdev_s)
137 raise ValueError("can't unserialize data!")
138 return backend.AssembleBlockDevice(bdev, owner, on_primary)
141 def perspective_blockdev_shutdown(params):
142 """Shutdown a block device.
146 bdev = objects.Disk.FromDict(bdev_s)
148 raise ValueError("can't unserialize data!")
149 return backend.ShutdownBlockDevice(bdev)
152 def perspective_blockdev_addchildren(params):
153 """Add a child to a mirror device.
155 Note: this is only valid for mirror devices. It's the caller's duty
156 to send a correct disk, otherwise we raise an error.
159 bdev_s, ndev_s = params
160 bdev = objects.Disk.FromDict(bdev_s)
161 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
162 if bdev is None or ndevs.count(None) > 0:
163 raise ValueError("can't unserialize data!")
164 return backend.MirrorAddChildren(bdev, ndevs)
167 def perspective_blockdev_removechildren(params):
168 """Remove a child from a mirror device.
170 This is only valid for mirror devices, of course. It's the callers
171 duty to send a correct disk, otherwise we raise an error.
174 bdev_s, ndev_s = params
175 bdev = objects.Disk.FromDict(bdev_s)
176 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177 if bdev is None or ndevs.count(None) > 0:
178 raise ValueError("can't unserialize data!")
179 return backend.MirrorRemoveChildren(bdev, ndevs)
182 def perspective_blockdev_getmirrorstatus(params):
183 """Return the mirror status for a list of disks.
186 disks = [objects.Disk.FromDict(dsk_s)
188 return backend.GetMirrorStatus(disks)
191 def perspective_blockdev_find(params):
192 """Expose the FindBlockDevice functionality for a disk.
194 This will try to find but not activate a disk.
197 disk = objects.Disk.FromDict(params[0])
198 return backend.FindBlockDevice(disk)
201 def perspective_blockdev_snapshot(params):
202 """Create a snapshot device.
204 Note that this is only valid for LVM disks, if we get passed
205 something else we raise an exception. The snapshot device can be
206 remove by calling the generic block device remove call.
209 cfbd = objects.Disk.FromDict(params[0])
210 return backend.SnapshotBlockDevice(cfbd)
213 def perspective_blockdev_grow(params):
214 """Grow a stack of devices.
217 cfbd = objects.Disk.FromDict(params[0])
219 return backend.GrowBlockDevice(cfbd, amount)
222 def perspective_blockdev_close(params):
223 """Closes the given block devices.
226 disks = [objects.Disk.FromDict(cf) for cf in params]
227 return backend.CloseBlockDevices(disks)
229 # export/import --------------------------
232 def perspective_snapshot_export(params):
233 """Export a given snapshot.
236 disk = objects.Disk.FromDict(params[0])
237 dest_node = params[1]
238 instance = objects.Instance.FromDict(params[2])
239 cluster_name = params[3]
240 return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
243 def perspective_finalize_export(params):
244 """Expose the finalize export functionality.
247 instance = objects.Instance.FromDict(params[0])
248 snap_disks = [objects.Disk.FromDict(str_data)
249 for str_data in params[1]]
250 return backend.FinalizeExport(instance, snap_disks)
253 def perspective_export_info(params):
254 """Query information about an existing export on this node.
256 The given path may not contain an export, in which case we return
261 einfo = backend.ExportInfo(path)
267 def perspective_export_list(params):
268 """List the available exports on this node.
270 Note that as opposed to export_info, which may query data about an
271 export in any path, this only queries the standard Ganeti path
272 (constants.EXPORT_DIR).
275 return backend.ListExports()
278 def perspective_export_remove(params):
283 return backend.RemoveExport(export)
285 # volume --------------------------
288 def perspective_volume_list(params):
289 """Query the list of logical volumes in a given volume group.
293 return backend.GetVolumeList(vgname)
296 def perspective_vg_list(params):
297 """Query the list of volume groups.
300 return backend.ListVolumeGroups()
302 # bridge --------------------------
305 def perspective_bridges_exist(params):
306 """Check if all bridges given exist on this node.
309 bridges_list = params[0]
310 return backend.BridgesExist(bridges_list)
312 # instance --------------------------
315 def perspective_instance_os_add(params):
316 """Install an OS on a given instance.
319 inst_s, os_disk, swap_disk = params
320 inst = objects.Instance.FromDict(inst_s)
321 return backend.AddOSToInstance(inst, os_disk, swap_disk)
324 def perspective_instance_run_rename(params):
325 """Runs the OS rename script for an instance.
328 inst_s, old_name, os_disk, swap_disk = params
329 inst = objects.Instance.FromDict(inst_s)
330 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
333 def perspective_instance_os_import(params):
334 """Run the import function of an OS onto a given instance.
337 inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
338 inst = objects.Instance.FromDict(inst_s)
339 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
340 src_node, src_image, cluster_name)
343 def perspective_instance_shutdown(params):
344 """Shutdown an instance.
347 instance = objects.Instance.FromDict(params[0])
348 return backend.ShutdownInstance(instance)
351 def perspective_instance_start(params):
352 """Start an instance.
355 instance = objects.Instance.FromDict(params[0])
356 extra_args = params[1]
357 return backend.StartInstance(instance, extra_args)
360 def perspective_instance_migrate(params):
361 """Migrates an instance.
364 instance, target, live = params
365 return backend.MigrateInstance(instance, target, live)
368 def perspective_instance_reboot(params):
369 """Reboot an instance.
372 instance = objects.Instance.FromDict(params[0])
373 reboot_type = params[1]
374 extra_args = params[2]
375 return backend.RebootInstance(instance, reboot_type, extra_args)
378 def perspective_instance_info(params):
379 """Query instance information.
382 return backend.GetInstanceInfo(params[0])
385 def perspective_all_instances_info(params):
386 """Query information about all instances.
389 return backend.GetAllInstancesInfo()
392 def perspective_instance_list(params):
393 """Query the list of running instances.
396 return backend.GetInstanceList()
398 # node --------------------------
401 def perspective_node_tcp_ping(params):
402 """Do a TcpPing on the remote node.
405 return utils.TcpPing(params[1], params[2], timeout=params[3],
406 live_port_needed=params[4], source=params[0])
409 def perspective_node_info(params):
410 """Query node information.
414 return backend.GetNodeInfo(vgname)
417 def perspective_node_add(params):
418 """Complete the registration of this node in the cluster.
421 return backend.AddNode(params[0], params[1], params[2],
422 params[3], params[4], params[5])
425 def perspective_node_verify(params):
426 """Run a verify sequence on this node.
429 return backend.VerifyNode(params[0], params[1])
432 def perspective_node_start_master(params):
433 """Promote this node to master status.
436 return backend.StartMaster(params[0])
439 def perspective_node_stop_master(params):
440 """Demote this node from master status.
443 return backend.StopMaster(params[0])
446 def perspective_node_leave_cluster(params):
447 """Cleanup after leaving a cluster.
450 return backend.LeaveCluster()
453 def perspective_node_volumes(params):
454 """Query the list of all logical volume groups.
457 return backend.NodeVolumes()
459 # cluster --------------------------
462 def perspective_version(params):
463 """Query version information.
466 return constants.PROTOCOL_VERSION
469 def perspective_upload_file(params):
472 Note that the backend implementation imposes strict rules on which
476 return backend.UploadFile(*params)
479 def perspective_master_info(params):
480 """Query master information.
483 return backend.GetMasterInfo()
485 # os -----------------------
488 def perspective_os_diagnose(params):
489 """Query detailed information about existing OSes.
492 return [os.ToDict() for os in backend.DiagnoseOS()]
495 def perspective_os_get(params):
496 """Query information about a given OS.
501 os_obj = backend.OSFromDisk(name)
502 except errors.InvalidOS, err:
503 os_obj = objects.OS.FromInvalidOS(err)
504 return os_obj.ToDict()
506 # hooks -----------------------
509 def perspective_hooks_runner(params):
513 hpath, phase, env = params
514 hr = backend.HooksRunner()
515 return hr.RunHooks(hpath, phase, env)
517 # iallocator -----------------
520 def perspective_iallocator_runner(params):
521 """Run an iallocator script.
525 iar = backend.IAllocatorRunner()
526 return iar.Run(name, idata)
528 # test -----------------------
531 def perspective_test_delay(params):
536 return utils.TestDelay(duration)
538 # file storage ---------------
541 def perspective_file_storage_dir_create(params):
542 """Create the file storage directory.
545 file_storage_dir = params[0]
546 return backend.CreateFileStorageDir(file_storage_dir)
549 def perspective_file_storage_dir_remove(params):
550 """Remove the file storage directory.
553 file_storage_dir = params[0]
554 return backend.RemoveFileStorageDir(file_storage_dir)
557 def perspective_file_storage_dir_rename(params):
558 """Rename the file storage directory.
561 old_file_storage_dir = params[0]
562 new_file_storage_dir = params[1]
563 return backend.RenameFileStorageDir(old_file_storage_dir,
564 new_file_storage_dir)
566 # jobs ------------------------
569 @_RequireJobQueueLock
570 def perspective_jobqueue_update(params):
574 (file_name, content) = params
575 return backend.JobQueueUpdate(file_name, content)
578 @_RequireJobQueueLock
579 def perspective_jobqueue_purge(params):
583 return backend.JobQueuePurge()
586 @_RequireJobQueueLock
587 def perspective_jobqueue_rename(params):
588 """Rename a job queue file.
593 return backend.JobQueueRename(old, new)
596 class NodeDaemonHttpServer(http.HTTPServer):
597 def __init__(self, server_address):
598 http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
599 self.noded_pid = os.getpid()
601 def serve_forever(self):
602 """Handle requests until told to quit."""
603 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
605 while not sighandler.called:
606 self.handle_request()
607 # TODO: There could be children running at this point
612 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
613 """Forking HTTP Server.
615 This inherits from ForkingMixIn and HTTPServer in order to fork for each
616 request we handle. This allows more requests to be handled concurrently.
622 """Parse the command line options.
625 (options, args) as from OptionParser.parse_args()
628 parser = OptionParser(description="Ganeti node daemon",
629 usage="%prog [-f] [-d]",
630 version="%%prog (ganeti) %s" %
631 constants.RELEASE_VERSION)
633 parser.add_option("-f", "--foreground", dest="fork",
634 help="Don't detach from the current terminal",
635 default=True, action="store_false")
636 parser.add_option("-d", "--debug", dest="debug",
637 help="Enable some debug messages",
638 default=False, action="store_true")
639 options, args = parser.parse_args()
644 """Main function for the node daemon.
649 options, args = ParseOptions()
650 utils.debug = options.debug
651 for fname in (constants.SSL_CERT_FILE,):
652 if not os.path.isfile(fname):
653 print "config %s not there, will not run." % fname
657 port = utils.GetNodeDaemonPort()
658 pwdata = utils.GetNodeDaemonPassword()
659 except errors.ConfigurationError, err:
660 print "Cluster configuration incomplete: '%s'" % str(err)
663 # create the various SUB_RUN_DIRS, if not existing, so that we handle the
664 # situation where RUN_DIR is tmpfs
665 for dir_name in constants.SUB_RUN_DIRS:
666 if not os.path.exists(dir_name):
668 os.mkdir(dir_name, 0755)
669 except EnvironmentError, err:
670 if err.errno != errno.EEXIST:
671 print ("Node setup wrong, cannot create directory %s: %s" %
674 if not os.path.isdir(dir_name):
675 print ("Node setup wrong, %s is not a directory" % dir_name)
680 utils.Daemonize(logfile=constants.LOG_NODESERVER)
682 utils.WritePidFile(constants.NODED_PID)
684 logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
685 stderr_logging=not options.fork)
686 logging.info("ganeti node daemon startup")
689 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
692 server = ForkingHTTPServer(('', port))
694 server = NodeDaemonHttpServer(('', port))
697 server.serve_forever()
699 utils.RemovePidFile(constants.NODED_PID)
702 if __name__ == '__main__':