4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
46 import ganeti.http.server
52 def _RequireJobQueueLock(fn):
53 """Decorator for job queue manipulating functions.
56 QUEUE_LOCK_TIMEOUT = 10
58 def wrapper(*args, **kwargs):
59 # Locking in exclusive, blocking mode because there could be several
60 # children running at the same time. Waiting up to 10 seconds.
61 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63 return fn(*args, **kwargs)
70 class NodeHttpServer(http.server.HttpServer):
71 """The server implementation.
73 This class holds all methods exposed over the RPC interface.
76 def __init__(self, *args, **kwargs):
77 http.server.HttpServer.__init__(self, *args, **kwargs)
78 self.noded_pid = os.getpid()
80 def HandleRequest(self, req):
84 if req.request_method.upper() != http.HTTP_PUT:
85 raise http.HttpBadRequest()
87 path = req.request_path
88 if path.startswith("/"):
91 method = getattr(self, "perspective_%s" % path, None)
93 raise http.HttpNotFound()
97 return method(req.request_body)
98 except backend.RPCFail, err:
99 # our custom failure exception; str(err) works fine if the
100 # exception was constructed with a single argument, and in
101 # this case, err.message == err.args[0] == str(err)
102 return (False, str(err))
104 logging.exception("Error in RPC call")
106 except errors.QuitGanetiException, err:
107 # Tell parent to quit
108 os.kill(self.noded_pid, signal.SIGTERM)
110 # the new block devices --------------------------
113 def perspective_blockdev_create(params):
114 """Create a block device.
117 bdev_s, size, owner, on_primary, info = params
118 bdev = objects.Disk.FromDict(bdev_s)
120 raise ValueError("can't unserialize data!")
121 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
124 def perspective_blockdev_remove(params):
125 """Remove a block device.
129 bdev = objects.Disk.FromDict(bdev_s)
130 return backend.BlockdevRemove(bdev)
133 def perspective_blockdev_rename(params):
134 """Remove a block device.
137 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
138 return backend.BlockdevRename(devlist)
141 def perspective_blockdev_assemble(params):
142 """Assemble a block device.
145 bdev_s, owner, on_primary = params
146 bdev = objects.Disk.FromDict(bdev_s)
148 raise ValueError("can't unserialize data!")
149 return backend.BlockdevAssemble(bdev, owner, on_primary)
152 def perspective_blockdev_shutdown(params):
153 """Shutdown a block device.
157 bdev = objects.Disk.FromDict(bdev_s)
159 raise ValueError("can't unserialize data!")
160 return backend.BlockdevShutdown(bdev)
163 def perspective_blockdev_addchildren(params):
164 """Add a child to a mirror device.
166 Note: this is only valid for mirror devices. It's the caller's duty
167 to send a correct disk, otherwise we raise an error.
170 bdev_s, ndev_s = params
171 bdev = objects.Disk.FromDict(bdev_s)
172 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
173 if bdev is None or ndevs.count(None) > 0:
174 raise ValueError("can't unserialize data!")
175 return backend.BlockdevAddchildren(bdev, ndevs)
178 def perspective_blockdev_removechildren(params):
179 """Remove a child from a mirror device.
181 This is only valid for mirror devices, of course. It's the callers
182 duty to send a correct disk, otherwise we raise an error.
185 bdev_s, ndev_s = params
186 bdev = objects.Disk.FromDict(bdev_s)
187 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
188 if bdev is None or ndevs.count(None) > 0:
189 raise ValueError("can't unserialize data!")
190 return backend.BlockdevRemovechildren(bdev, ndevs)
193 def perspective_blockdev_getmirrorstatus(params):
194 """Return the mirror status for a list of disks.
197 disks = [objects.Disk.FromDict(dsk_s)
199 return backend.BlockdevGetmirrorstatus(disks)
202 def perspective_blockdev_find(params):
203 """Expose the FindBlockDevice functionality for a disk.
205 This will try to find but not activate a disk.
208 disk = objects.Disk.FromDict(params[0])
209 return backend.BlockdevFind(disk)
212 def perspective_blockdev_snapshot(params):
213 """Create a snapshot device.
215 Note that this is only valid for LVM disks, if we get passed
216 something else we raise an exception. The snapshot device can be
217 remove by calling the generic block device remove call.
220 cfbd = objects.Disk.FromDict(params[0])
221 return backend.BlockdevSnapshot(cfbd)
224 def perspective_blockdev_grow(params):
225 """Grow a stack of devices.
228 cfbd = objects.Disk.FromDict(params[0])
230 return backend.BlockdevGrow(cfbd, amount)
233 def perspective_blockdev_close(params):
234 """Closes the given block devices.
237 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
238 return backend.BlockdevClose(params[0], disks)
240 # blockdev/drbd specific methods ----------
243 def perspective_drbd_disconnect_net(params):
244 """Disconnects the network connection of drbd disks.
246 Note that this is only valid for drbd disks, so the members of the
247 disk list must all be drbd devices.
250 nodes_ip, disks = params
251 disks = [objects.Disk.FromDict(cf) for cf in disks]
252 return backend.DrbdDisconnectNet(nodes_ip, disks)
255 def perspective_drbd_attach_net(params):
256 """Attaches the network connection of drbd disks.
258 Note that this is only valid for drbd disks, so the members of the
259 disk list must all be drbd devices.
262 nodes_ip, disks, instance_name, multimaster = params
263 disks = [objects.Disk.FromDict(cf) for cf in disks]
264 return backend.DrbdAttachNet(nodes_ip, disks,
265 instance_name, multimaster)
268 def perspective_drbd_wait_sync(params):
269 """Wait until DRBD disks are synched.
271 Note that this is only valid for drbd disks, so the members of the
272 disk list must all be drbd devices.
275 nodes_ip, disks = params
276 disks = [objects.Disk.FromDict(cf) for cf in disks]
277 return backend.DrbdWaitSync(nodes_ip, disks)
279 # export/import --------------------------
282 def perspective_snapshot_export(params):
283 """Export a given snapshot.
286 disk = objects.Disk.FromDict(params[0])
287 dest_node = params[1]
288 instance = objects.Instance.FromDict(params[2])
289 cluster_name = params[3]
291 return backend.ExportSnapshot(disk, dest_node, instance,
292 cluster_name, dev_idx)
295 def perspective_finalize_export(params):
296 """Expose the finalize export functionality.
299 instance = objects.Instance.FromDict(params[0])
300 snap_disks = [objects.Disk.FromDict(str_data)
301 for str_data in params[1]]
302 return backend.FinalizeExport(instance, snap_disks)
305 def perspective_export_info(params):
306 """Query information about an existing export on this node.
308 The given path may not contain an export, in which case we return
313 einfo = backend.ExportInfo(path)
319 def perspective_export_list(params):
320 """List the available exports on this node.
322 Note that as opposed to export_info, which may query data about an
323 export in any path, this only queries the standard Ganeti path
324 (constants.EXPORT_DIR).
327 return backend.ListExports()
330 def perspective_export_remove(params):
335 return backend.RemoveExport(export)
337 # volume --------------------------
340 def perspective_volume_list(params):
341 """Query the list of logical volumes in a given volume group.
345 return backend.GetVolumeList(vgname)
348 def perspective_vg_list(params):
349 """Query the list of volume groups.
352 return backend.ListVolumeGroups()
354 # bridge --------------------------
357 def perspective_bridges_exist(params):
358 """Check if all bridges given exist on this node.
361 bridges_list = params[0]
362 return backend.BridgesExist(bridges_list)
364 # instance --------------------------
367 def perspective_instance_os_add(params):
368 """Install an OS on a given instance.
372 inst = objects.Instance.FromDict(inst_s)
373 reinstall = params[1]
374 return backend.InstanceOsAdd(inst, reinstall)
377 def perspective_instance_run_rename(params):
378 """Runs the OS rename script for an instance.
381 inst_s, old_name = params
382 inst = objects.Instance.FromDict(inst_s)
383 return backend.RunRenameInstance(inst, old_name)
386 def perspective_instance_os_import(params):
387 """Run the import function of an OS onto a given instance.
390 inst_s, src_node, src_images, cluster_name = params
391 inst = objects.Instance.FromDict(inst_s)
392 return backend.ImportOSIntoInstance(inst, src_node, src_images,
396 def perspective_instance_shutdown(params):
397 """Shutdown an instance.
400 instance = objects.Instance.FromDict(params[0])
401 return backend.InstanceShutdown(instance)
404 def perspective_instance_start(params):
405 """Start an instance.
408 instance = objects.Instance.FromDict(params[0])
409 return backend.StartInstance(instance)
412 def perspective_migration_info(params):
413 """Gather information about an instance to be migrated.
416 instance = objects.Instance.FromDict(params[0])
417 return backend.MigrationInfo(instance)
420 def perspective_accept_instance(params):
421 """Prepare the node to accept an instance.
424 instance, info, target = params
425 instance = objects.Instance.FromDict(instance)
426 return backend.AcceptInstance(instance, info, target)
429 def perspective_finalize_migration(params):
430 """Finalize the instance migration.
433 instance, info, success = params
434 instance = objects.Instance.FromDict(instance)
435 return backend.FinalizeMigration(instance, info, success)
438 def perspective_instance_migrate(params):
439 """Migrates an instance.
442 instance, target, live = params
443 instance = objects.Instance.FromDict(instance)
444 return backend.MigrateInstance(instance, target, live)
447 def perspective_instance_reboot(params):
448 """Reboot an instance.
451 instance = objects.Instance.FromDict(params[0])
452 reboot_type = params[1]
453 return backend.InstanceReboot(instance, reboot_type)
456 def perspective_instance_info(params):
457 """Query instance information.
460 return backend.GetInstanceInfo(params[0], params[1])
463 def perspective_instance_migratable(params):
464 """Query whether the specified instance can be migrated.
467 instance = objects.Instance.FromDict(params[0])
468 return backend.GetInstanceMigratable(instance)
471 def perspective_all_instances_info(params):
472 """Query information about all instances.
475 return backend.GetAllInstancesInfo(params[0])
478 def perspective_instance_list(params):
479 """Query the list of running instances.
482 return backend.GetInstanceList(params[0])
484 # node --------------------------
487 def perspective_node_tcp_ping(params):
488 """Do a TcpPing on the remote node.
491 return utils.TcpPing(params[1], params[2], timeout=params[3],
492 live_port_needed=params[4], source=params[0])
495 def perspective_node_has_ip_address(params):
496 """Checks if a node has the given ip address.
499 return utils.OwnIpAddress(params[0])
502 def perspective_node_info(params):
503 """Query node information.
506 vgname, hypervisor_type = params
507 return backend.GetNodeInfo(vgname, hypervisor_type)
510 def perspective_node_add(params):
511 """Complete the registration of this node in the cluster.
514 return backend.AddNode(params[0], params[1], params[2],
515 params[3], params[4], params[5])
518 def perspective_node_verify(params):
519 """Run a verify sequence on this node.
522 return backend.VerifyNode(params[0], params[1])
525 def perspective_node_start_master(params):
526 """Promote this node to master status.
529 return backend.StartMaster(params[0])
532 def perspective_node_stop_master(params):
533 """Demote this node from master status.
536 return backend.StopMaster(params[0])
539 def perspective_node_leave_cluster(params):
540 """Cleanup after leaving a cluster.
543 return backend.LeaveCluster()
546 def perspective_node_volumes(params):
547 """Query the list of all logical volume groups.
550 return backend.NodeVolumes()
553 def perspective_node_demote_from_mc(params):
554 """Demote a node from the master candidate role.
557 return backend.DemoteFromMC()
561 def perspective_node_powercycle(params):
562 """Tries to powercycle the nod.
565 hypervisor_type = params[0]
566 return backend.PowercycleNode(hypervisor_type)
569 # cluster --------------------------
572 def perspective_version(params):
573 """Query version information.
576 return constants.PROTOCOL_VERSION
579 def perspective_upload_file(params):
582 Note that the backend implementation imposes strict rules on which
586 return backend.UploadFile(*params)
589 def perspective_master_info(params):
590 """Query master information.
593 return backend.GetMasterInfo()
596 def perspective_write_ssconf_files(params):
597 """Write ssconf files.
601 return backend.WriteSsconfFiles(values)
603 # os -----------------------
606 def perspective_os_diagnose(params):
607 """Query detailed information about existing OSes.
610 return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
613 def perspective_os_get(params):
614 """Query information about a given OS.
619 os_obj = backend.OSFromDisk(name)
620 except errors.InvalidOS, err:
621 os_obj = objects.OS.FromInvalidOS(err)
622 return os_obj.ToDict()
624 # hooks -----------------------
627 def perspective_hooks_runner(params):
631 hpath, phase, env = params
632 hr = backend.HooksRunner()
633 return hr.RunHooks(hpath, phase, env)
635 # iallocator -----------------
638 def perspective_iallocator_runner(params):
639 """Run an iallocator script.
643 iar = backend.IAllocatorRunner()
644 return iar.Run(name, idata)
646 # test -----------------------
649 def perspective_test_delay(params):
654 return utils.TestDelay(duration)
656 # file storage ---------------
659 def perspective_file_storage_dir_create(params):
660 """Create the file storage directory.
663 file_storage_dir = params[0]
664 return backend.CreateFileStorageDir(file_storage_dir)
667 def perspective_file_storage_dir_remove(params):
668 """Remove the file storage directory.
671 file_storage_dir = params[0]
672 return backend.RemoveFileStorageDir(file_storage_dir)
675 def perspective_file_storage_dir_rename(params):
676 """Rename the file storage directory.
679 old_file_storage_dir = params[0]
680 new_file_storage_dir = params[1]
681 return backend.RenameFileStorageDir(old_file_storage_dir,
682 new_file_storage_dir)
684 # jobs ------------------------
687 @_RequireJobQueueLock
688 def perspective_jobqueue_update(params):
692 (file_name, content) = params
693 return backend.JobQueueUpdate(file_name, content)
696 @_RequireJobQueueLock
697 def perspective_jobqueue_purge(params):
701 return backend.JobQueuePurge()
704 @_RequireJobQueueLock
705 def perspective_jobqueue_rename(params):
706 """Rename a job queue file.
709 # TODO: What if a file fails to rename?
710 return [backend.JobQueueRename(old, new) for old, new in params]
713 def perspective_jobqueue_set_drain(params):
714 """Set/unset the queue drain flag.
717 drain_flag = params[0]
718 return backend.JobQueueSetDrainFlag(drain_flag)
721 # hypervisor ---------------
724 def perspective_hypervisor_validate_params(params):
725 """Validate the hypervisor parameters.
728 (hvname, hvparams) = params
729 return backend.ValidateHVParams(hvname, hvparams)
733 """Parse the command line options.
735 @return: (options, args) as from OptionParser.parse_args()
738 parser = OptionParser(description="Ganeti node daemon",
739 usage="%prog [-f] [-d] [-b ADDRESS]",
740 version="%%prog (ganeti) %s" %
741 constants.RELEASE_VERSION)
743 parser.add_option("-f", "--foreground", dest="fork",
744 help="Don't detach from the current terminal",
745 default=True, action="store_false")
746 parser.add_option("-d", "--debug", dest="debug",
747 help="Enable some debug messages",
748 default=False, action="store_true")
749 parser.add_option("-b", "--bind", dest="bind_address",
751 default="", metavar="ADDRESS")
753 options, args = parser.parse_args()
758 """Main function for the node daemon.
763 options, args = ParseOptions()
764 utils.debug = options.debug
769 for fname in (constants.SSL_CERT_FILE,):
770 if not os.path.isfile(fname):
771 print "config %s not there, will not run." % fname
775 port = utils.GetNodeDaemonPort()
776 except errors.ConfigurationError, err:
777 print "Cluster configuration incomplete: '%s'" % str(err)
780 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
781 dirs.append((constants.LOG_OS_DIR, 0750))
782 dirs.append((constants.LOCK_DIR, 1777))
783 utils.EnsureDirs(dirs)
787 utils.Daemonize(logfile=constants.LOG_NODESERVER)
789 utils.WritePidFile(constants.NODED_PID)
791 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
792 stderr_logging=not options.fork)
793 logging.info("ganeti node daemon startup")
795 # Read SSL certificate
796 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
797 ssl_cert_path=constants.SSL_CERT_FILE)
800 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
802 mainloop = daemon.Mainloop()
803 server = NodeHttpServer(mainloop, options.bind_address, port,
804 ssl_params=ssl_params, ssl_verify_peer=True)
811 utils.RemovePidFile(constants.NODED_PID)
814 if __name__ == '__main__':