4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import jstore
43 from ganeti import daemon
44 from ganeti import http
45 from ganeti import utils
51 def _RequireJobQueueLock(fn):
52 """Decorator for job queue manipulating functions.
55 QUEUE_LOCK_TIMEOUT = 10
57 def wrapper(*args, **kwargs):
58 # Locking in exclusive, blocking mode because there could be several
59 # children running at the same time. Waiting up to 10 seconds.
60 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62 return fn(*args, **kwargs)
69 class NodeHttpServer(http.HttpServer):
70 """The server implementation.
72 This class holds all methods exposed over the RPC interface.
75 def __init__(self, *args, **kwargs):
76 http.HttpServer.__init__(self, *args, **kwargs)
77 self.noded_pid = os.getpid()
79 def HandleRequest(self, req):
83 if req.request_method.upper() != "PUT":
84 raise http.HTTPBadRequest()
86 path = req.request_path
87 if path.startswith("/"):
90 method = getattr(self, "perspective_%s" % path, None)
92 raise http.HTTPNotFound()
96 return method(req.request_post_data)
98 logging.exception("Error in RPC call")
100 except errors.QuitGanetiException, err:
101 # Tell parent to quit
102 os.kill(self.noded_pid, signal.SIGTERM)
104 # the new block devices --------------------------
107 def perspective_blockdev_create(params):
108 """Create a block device.
111 bdev_s, size, owner, on_primary, info = params
112 bdev = objects.Disk.FromDict(bdev_s)
114 raise ValueError("can't unserialize data!")
115 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
118 def perspective_blockdev_remove(params):
119 """Remove a block device.
123 bdev = objects.Disk.FromDict(bdev_s)
124 return backend.RemoveBlockDevice(bdev)
127 def perspective_blockdev_rename(params):
128 """Remove a block device.
131 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
132 return backend.RenameBlockDevices(devlist)
135 def perspective_blockdev_assemble(params):
136 """Assemble a block device.
139 bdev_s, owner, on_primary = params
140 bdev = objects.Disk.FromDict(bdev_s)
142 raise ValueError("can't unserialize data!")
143 return backend.AssembleBlockDevice(bdev, owner, on_primary)
146 def perspective_blockdev_shutdown(params):
147 """Shutdown a block device.
151 bdev = objects.Disk.FromDict(bdev_s)
153 raise ValueError("can't unserialize data!")
154 return backend.ShutdownBlockDevice(bdev)
157 def perspective_blockdev_addchildren(params):
158 """Add a child to a mirror device.
160 Note: this is only valid for mirror devices. It's the caller's duty
161 to send a correct disk, otherwise we raise an error.
164 bdev_s, ndev_s = params
165 bdev = objects.Disk.FromDict(bdev_s)
166 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
167 if bdev is None or ndevs.count(None) > 0:
168 raise ValueError("can't unserialize data!")
169 return backend.MirrorAddChildren(bdev, ndevs)
172 def perspective_blockdev_removechildren(params):
173 """Remove a child from a mirror device.
175 This is only valid for mirror devices, of course. It's the callers
176 duty to send a correct disk, otherwise we raise an error.
179 bdev_s, ndev_s = params
180 bdev = objects.Disk.FromDict(bdev_s)
181 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
182 if bdev is None or ndevs.count(None) > 0:
183 raise ValueError("can't unserialize data!")
184 return backend.MirrorRemoveChildren(bdev, ndevs)
187 def perspective_blockdev_getmirrorstatus(params):
188 """Return the mirror status for a list of disks.
191 disks = [objects.Disk.FromDict(dsk_s)
193 return backend.GetMirrorStatus(disks)
196 def perspective_blockdev_find(params):
197 """Expose the FindBlockDevice functionality for a disk.
199 This will try to find but not activate a disk.
202 disk = objects.Disk.FromDict(params[0])
203 return backend.FindBlockDevice(disk)
206 def perspective_blockdev_snapshot(params):
207 """Create a snapshot device.
209 Note that this is only valid for LVM disks, if we get passed
210 something else we raise an exception. The snapshot device can be
211 remove by calling the generic block device remove call.
214 cfbd = objects.Disk.FromDict(params[0])
215 return backend.SnapshotBlockDevice(cfbd)
218 def perspective_blockdev_grow(params):
219 """Grow a stack of devices.
222 cfbd = objects.Disk.FromDict(params[0])
224 return backend.GrowBlockDevice(cfbd, amount)
227 def perspective_blockdev_close(params):
228 """Closes the given block devices.
231 disks = [objects.Disk.FromDict(cf) for cf in params]
232 return backend.CloseBlockDevices(disks)
234 # export/import --------------------------
237 def perspective_snapshot_export(params):
238 """Export a given snapshot.
241 disk = objects.Disk.FromDict(params[0])
242 dest_node = params[1]
243 instance = objects.Instance.FromDict(params[2])
244 cluster_name = params[3]
245 return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
248 def perspective_finalize_export(params):
249 """Expose the finalize export functionality.
252 instance = objects.Instance.FromDict(params[0])
253 snap_disks = [objects.Disk.FromDict(str_data)
254 for str_data in params[1]]
255 return backend.FinalizeExport(instance, snap_disks)
258 def perspective_export_info(params):
259 """Query information about an existing export on this node.
261 The given path may not contain an export, in which case we return
266 einfo = backend.ExportInfo(path)
272 def perspective_export_list(params):
273 """List the available exports on this node.
275 Note that as opposed to export_info, which may query data about an
276 export in any path, this only queries the standard Ganeti path
277 (constants.EXPORT_DIR).
280 return backend.ListExports()
283 def perspective_export_remove(params):
288 return backend.RemoveExport(export)
290 # volume --------------------------
293 def perspective_volume_list(params):
294 """Query the list of logical volumes in a given volume group.
298 return backend.GetVolumeList(vgname)
301 def perspective_vg_list(params):
302 """Query the list of volume groups.
305 return backend.ListVolumeGroups()
307 # bridge --------------------------
310 def perspective_bridges_exist(params):
311 """Check if all bridges given exist on this node.
314 bridges_list = params[0]
315 return backend.BridgesExist(bridges_list)
317 # instance --------------------------
320 def perspective_instance_os_add(params):
321 """Install an OS on a given instance.
324 inst_s, os_disk, swap_disk = params
325 inst = objects.Instance.FromDict(inst_s)
326 return backend.AddOSToInstance(inst, os_disk, swap_disk)
329 def perspective_instance_run_rename(params):
330 """Runs the OS rename script for an instance.
333 inst_s, old_name, os_disk, swap_disk = params
334 inst = objects.Instance.FromDict(inst_s)
335 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
338 def perspective_instance_os_import(params):
339 """Run the import function of an OS onto a given instance.
342 inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
343 inst = objects.Instance.FromDict(inst_s)
344 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
345 src_node, src_image, cluster_name)
348 def perspective_instance_shutdown(params):
349 """Shutdown an instance.
352 instance = objects.Instance.FromDict(params[0])
353 return backend.ShutdownInstance(instance)
356 def perspective_instance_start(params):
357 """Start an instance.
360 instance = objects.Instance.FromDict(params[0])
361 extra_args = params[1]
362 return backend.StartInstance(instance, extra_args)
365 def perspective_instance_migrate(params):
366 """Migrates an instance.
369 instance, target, live = params
370 instance = objects.Instance.FromDict(instance)
371 return backend.MigrateInstance(instance, target, live)
374 def perspective_instance_reboot(params):
375 """Reboot an instance.
378 instance = objects.Instance.FromDict(params[0])
379 reboot_type = params[1]
380 extra_args = params[2]
381 return backend.RebootInstance(instance, reboot_type, extra_args)
384 def perspective_instance_info(params):
385 """Query instance information.
388 return backend.GetInstanceInfo(params[0], params[1])
391 def perspective_all_instances_info(params):
392 """Query information about all instances.
395 return backend.GetAllInstancesInfo(params[0])
398 def perspective_instance_list(params):
399 """Query the list of running instances.
402 return backend.GetInstanceList(params[0])
404 # node --------------------------
407 def perspective_node_tcp_ping(params):
408 """Do a TcpPing on the remote node.
411 return utils.TcpPing(params[1], params[2], timeout=params[3],
412 live_port_needed=params[4], source=params[0])
415 def perspective_node_has_ip_address(params):
416 """Checks if a node has the given ip address.
419 return utils.OwnIpAddress(params[0])
422 def perspective_node_info(params):
423 """Query node information.
426 vgname, hypervisor_type = params
427 return backend.GetNodeInfo(vgname, hypervisor_type)
430 def perspective_node_add(params):
431 """Complete the registration of this node in the cluster.
434 return backend.AddNode(params[0], params[1], params[2],
435 params[3], params[4], params[5])
438 def perspective_node_verify(params):
439 """Run a verify sequence on this node.
442 return backend.VerifyNode(params[0], params[1])
445 def perspective_node_start_master(params):
446 """Promote this node to master status.
449 return backend.StartMaster(params[0])
452 def perspective_node_stop_master(params):
453 """Demote this node from master status.
456 return backend.StopMaster(params[0])
459 def perspective_node_leave_cluster(params):
460 """Cleanup after leaving a cluster.
463 return backend.LeaveCluster()
466 def perspective_node_volumes(params):
467 """Query the list of all logical volume groups.
470 return backend.NodeVolumes()
472 # cluster --------------------------
475 def perspective_version(params):
476 """Query version information.
479 return constants.PROTOCOL_VERSION
482 def perspective_upload_file(params):
485 Note that the backend implementation imposes strict rules on which
489 return backend.UploadFile(*params)
492 def perspective_master_info(params):
493 """Query master information.
496 return backend.GetMasterInfo()
498 # os -----------------------
501 def perspective_os_diagnose(params):
502 """Query detailed information about existing OSes.
505 return [os.ToDict() for os in backend.DiagnoseOS()]
508 def perspective_os_get(params):
509 """Query information about a given OS.
514 os_obj = backend.OSFromDisk(name)
515 except errors.InvalidOS, err:
516 os_obj = objects.OS.FromInvalidOS(err)
517 return os_obj.ToDict()
519 # hooks -----------------------
522 def perspective_hooks_runner(params):
526 hpath, phase, env = params
527 hr = backend.HooksRunner()
528 return hr.RunHooks(hpath, phase, env)
530 # iallocator -----------------
533 def perspective_iallocator_runner(params):
534 """Run an iallocator script.
538 iar = backend.IAllocatorRunner()
539 return iar.Run(name, idata)
541 # test -----------------------
544 def perspective_test_delay(params):
549 return utils.TestDelay(duration)
551 # file storage ---------------
554 def perspective_file_storage_dir_create(params):
555 """Create the file storage directory.
558 file_storage_dir = params[0]
559 return backend.CreateFileStorageDir(file_storage_dir)
562 def perspective_file_storage_dir_remove(params):
563 """Remove the file storage directory.
566 file_storage_dir = params[0]
567 return backend.RemoveFileStorageDir(file_storage_dir)
570 def perspective_file_storage_dir_rename(params):
571 """Rename the file storage directory.
574 old_file_storage_dir = params[0]
575 new_file_storage_dir = params[1]
576 return backend.RenameFileStorageDir(old_file_storage_dir,
577 new_file_storage_dir)
579 # jobs ------------------------
582 @_RequireJobQueueLock
583 def perspective_jobqueue_update(params):
587 (file_name, content) = params
588 return backend.JobQueueUpdate(file_name, content)
591 @_RequireJobQueueLock
592 def perspective_jobqueue_purge(params):
596 return backend.JobQueuePurge()
599 @_RequireJobQueueLock
600 def perspective_jobqueue_rename(params):
601 """Rename a job queue file.
606 return backend.JobQueueRename(old, new)
609 def perspective_jobqueue_set_drain(params):
610 """Set/unset the queue drain flag.
613 drain_flag = params[0]
614 return backend.JobQueueSetDrainFlag(drain_flag)
617 # hypervisor ---------------
620 def perspective_hypervisor_validate_params(params):
621 """Validate the hypervisor parameters.
624 (hvname, hvparams) = params
625 return backend.ValidateHVParams(hvname, hvparams)
629 """Parse the command line options.
632 (options, args) as from OptionParser.parse_args()
635 parser = OptionParser(description="Ganeti node daemon",
636 usage="%prog [-f] [-d]",
637 version="%%prog (ganeti) %s" %
638 constants.RELEASE_VERSION)
640 parser.add_option("-f", "--foreground", dest="fork",
641 help="Don't detach from the current terminal",
642 default=True, action="store_false")
643 parser.add_option("-d", "--debug", dest="debug",
644 help="Enable some debug messages",
645 default=False, action="store_true")
646 options, args = parser.parse_args()
651 """Main function for the node daemon.
656 options, args = ParseOptions()
657 utils.debug = options.debug
658 for fname in (constants.SSL_CERT_FILE,):
659 if not os.path.isfile(fname):
660 print "config %s not there, will not run." % fname
664 port = utils.GetNodeDaemonPort()
665 pwdata = utils.GetNodeDaemonPassword()
666 except errors.ConfigurationError, err:
667 print "Cluster configuration incomplete: '%s'" % str(err)
670 # create the various SUB_RUN_DIRS, if not existing, so that we handle the
671 # situation where RUN_DIR is tmpfs
672 for dir_name in constants.SUB_RUN_DIRS:
673 if not os.path.exists(dir_name):
675 os.mkdir(dir_name, 0755)
676 except EnvironmentError, err:
677 if err.errno != errno.EEXIST:
678 print ("Node setup wrong, cannot create directory %s: %s" %
681 if not os.path.isdir(dir_name):
682 print ("Node setup wrong, %s is not a directory" % dir_name)
687 utils.Daemonize(logfile=constants.LOG_NODESERVER)
689 utils.WritePidFile(constants.NODED_PID)
691 logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
692 stderr_logging=not options.fork)
693 logging.info("ganeti node daemon startup")
696 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
698 mainloop = daemon.Mainloop()
699 server = NodeHttpServer(mainloop, ("", port))
706 utils.RemovePidFile(constants.NODED_PID)
709 if __name__ == '__main__':