4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
50 def _RequireJobQueueLock(fn):
51 """Decorator for job queue manipulating functions.
54 QUEUE_LOCK_TIMEOUT = 10
56 def wrapper(*args, **kwargs):
57 # Locking in exclusive, blocking mode because there could be several
58 # children running at the same time. Waiting up to 10 seconds.
59 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61 return fn(*args, **kwargs)
68 class NodeHttpServer(http.HttpServer):
69 """The server implementation.
71 This class holds all methods exposed over the RPC interface.
74 def __init__(self, *args, **kwargs):
75 http.HttpServer.__init__(self, *args, **kwargs)
76 self.noded_pid = os.getpid()
78 def HandleRequest(self, req):
82 if req.request_method.upper() != "PUT":
83 raise http.HTTPBadRequest()
85 path = req.request_path
86 if path.startswith("/"):
89 method = getattr(self, "perspective_%s" % path, None)
91 raise http.HTTPNotFound()
95 return method(req.request_post_data)
97 logging.exception("Error in RPC call")
99 except errors.QuitGanetiException, err:
100 # Tell parent to quit
101 os.kill(self.noded_pid, signal.SIGTERM)
103 # the new block devices --------------------------
106 def perspective_blockdev_create(params):
107 """Create a block device.
110 bdev_s, size, owner, on_primary, info = params
111 bdev = objects.Disk.FromDict(bdev_s)
113 raise ValueError("can't unserialize data!")
114 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
117 def perspective_blockdev_remove(params):
118 """Remove a block device.
122 bdev = objects.Disk.FromDict(bdev_s)
123 return backend.RemoveBlockDevice(bdev)
126 def perspective_blockdev_rename(params):
127 """Remove a block device.
130 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131 return backend.RenameBlockDevices(devlist)
134 def perspective_blockdev_assemble(params):
135 """Assemble a block device.
138 bdev_s, owner, on_primary = params
139 bdev = objects.Disk.FromDict(bdev_s)
141 raise ValueError("can't unserialize data!")
142 return backend.AssembleBlockDevice(bdev, owner, on_primary)
145 def perspective_blockdev_shutdown(params):
146 """Shutdown a block device.
150 bdev = objects.Disk.FromDict(bdev_s)
152 raise ValueError("can't unserialize data!")
153 return backend.ShutdownBlockDevice(bdev)
156 def perspective_blockdev_addchildren(params):
157 """Add a child to a mirror device.
159 Note: this is only valid for mirror devices. It's the caller's duty
160 to send a correct disk, otherwise we raise an error.
163 bdev_s, ndev_s = params
164 bdev = objects.Disk.FromDict(bdev_s)
165 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166 if bdev is None or ndevs.count(None) > 0:
167 raise ValueError("can't unserialize data!")
168 return backend.MirrorAddChildren(bdev, ndevs)
171 def perspective_blockdev_removechildren(params):
172 """Remove a child from a mirror device.
174 This is only valid for mirror devices, of course. It's the callers
175 duty to send a correct disk, otherwise we raise an error.
178 bdev_s, ndev_s = params
179 bdev = objects.Disk.FromDict(bdev_s)
180 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181 if bdev is None or ndevs.count(None) > 0:
182 raise ValueError("can't unserialize data!")
183 return backend.MirrorRemoveChildren(bdev, ndevs)
186 def perspective_blockdev_getmirrorstatus(params):
187 """Return the mirror status for a list of disks.
190 disks = [objects.Disk.FromDict(dsk_s)
192 return backend.GetMirrorStatus(disks)
195 def perspective_blockdev_find(params):
196 """Expose the FindBlockDevice functionality for a disk.
198 This will try to find but not activate a disk.
201 disk = objects.Disk.FromDict(params[0])
202 return backend.FindBlockDevice(disk)
205 def perspective_blockdev_snapshot(params):
206 """Create a snapshot device.
208 Note that this is only valid for LVM disks, if we get passed
209 something else we raise an exception. The snapshot device can be
210 remove by calling the generic block device remove call.
213 cfbd = objects.Disk.FromDict(params[0])
214 return backend.SnapshotBlockDevice(cfbd)
217 def perspective_blockdev_grow(params):
218 """Grow a stack of devices.
221 cfbd = objects.Disk.FromDict(params[0])
223 return backend.GrowBlockDevice(cfbd, amount)
226 def perspective_blockdev_close(params):
227 """Closes the given block devices.
230 disks = [objects.Disk.FromDict(cf) for cf in params]
231 return backend.CloseBlockDevices(disks)
233 # export/import --------------------------
236 def perspective_snapshot_export(params):
237 """Export a given snapshot.
240 disk = objects.Disk.FromDict(params[0])
241 dest_node = params[1]
242 instance = objects.Instance.FromDict(params[2])
243 cluster_name = params[3]
245 return backend.ExportSnapshot(disk, dest_node, instance,
246 cluster_name, dev_idx)
249 def perspective_finalize_export(params):
250 """Expose the finalize export functionality.
253 instance = objects.Instance.FromDict(params[0])
254 snap_disks = [objects.Disk.FromDict(str_data)
255 for str_data in params[1]]
256 return backend.FinalizeExport(instance, snap_disks)
259 def perspective_export_info(params):
260 """Query information about an existing export on this node.
262 The given path may not contain an export, in which case we return
267 einfo = backend.ExportInfo(path)
273 def perspective_export_list(params):
274 """List the available exports on this node.
276 Note that as opposed to export_info, which may query data about an
277 export in any path, this only queries the standard Ganeti path
278 (constants.EXPORT_DIR).
281 return backend.ListExports()
284 def perspective_export_remove(params):
289 return backend.RemoveExport(export)
291 # volume --------------------------
294 def perspective_volume_list(params):
295 """Query the list of logical volumes in a given volume group.
299 return backend.GetVolumeList(vgname)
302 def perspective_vg_list(params):
303 """Query the list of volume groups.
306 return backend.ListVolumeGroups()
308 # bridge --------------------------
311 def perspective_bridges_exist(params):
312 """Check if all bridges given exist on this node.
315 bridges_list = params[0]
316 return backend.BridgesExist(bridges_list)
318 # instance --------------------------
321 def perspective_instance_os_add(params):
322 """Install an OS on a given instance.
326 inst = objects.Instance.FromDict(inst_s)
327 return backend.AddOSToInstance(inst)
330 def perspective_instance_run_rename(params):
331 """Runs the OS rename script for an instance.
334 inst_s, old_name = params
335 inst = objects.Instance.FromDict(inst_s)
336 return backend.RunRenameInstance(inst, old_name)
339 def perspective_instance_os_import(params):
340 """Run the import function of an OS onto a given instance.
343 inst_s, src_node, src_images, cluster_name = params
344 inst = objects.Instance.FromDict(inst_s)
345 return backend.ImportOSIntoInstance(inst, src_node, src_images,
349 def perspective_instance_shutdown(params):
350 """Shutdown an instance.
353 instance = objects.Instance.FromDict(params[0])
354 return backend.ShutdownInstance(instance)
357 def perspective_instance_start(params):
358 """Start an instance.
361 instance = objects.Instance.FromDict(params[0])
362 extra_args = params[1]
363 return backend.StartInstance(instance, extra_args)
366 def perspective_instance_migrate(params):
367 """Migrates an instance.
370 instance, target, live = params
371 instance = objects.Instance.FromDict(instance)
372 return backend.MigrateInstance(instance, target, live)
375 def perspective_instance_reboot(params):
376 """Reboot an instance.
379 instance = objects.Instance.FromDict(params[0])
380 reboot_type = params[1]
381 extra_args = params[2]
382 return backend.RebootInstance(instance, reboot_type, extra_args)
385 def perspective_instance_info(params):
386 """Query instance information.
389 return backend.GetInstanceInfo(params[0], params[1])
392 def perspective_all_instances_info(params):
393 """Query information about all instances.
396 return backend.GetAllInstancesInfo(params[0])
399 def perspective_instance_list(params):
400 """Query the list of running instances.
403 return backend.GetInstanceList(params[0])
405 # node --------------------------
408 def perspective_node_tcp_ping(params):
409 """Do a TcpPing on the remote node.
412 return utils.TcpPing(params[1], params[2], timeout=params[3],
413 live_port_needed=params[4], source=params[0])
416 def perspective_node_has_ip_address(params):
417 """Checks if a node has the given ip address.
420 return utils.OwnIpAddress(params[0])
423 def perspective_node_info(params):
424 """Query node information.
427 vgname, hypervisor_type = params
428 return backend.GetNodeInfo(vgname, hypervisor_type)
431 def perspective_node_add(params):
432 """Complete the registration of this node in the cluster.
435 return backend.AddNode(params[0], params[1], params[2],
436 params[3], params[4], params[5])
439 def perspective_node_verify(params):
440 """Run a verify sequence on this node.
443 return backend.VerifyNode(params[0], params[1])
446 def perspective_node_start_master(params):
447 """Promote this node to master status.
450 return backend.StartMaster(params[0])
453 def perspective_node_stop_master(params):
454 """Demote this node from master status.
457 return backend.StopMaster(params[0])
460 def perspective_node_leave_cluster(params):
461 """Cleanup after leaving a cluster.
464 return backend.LeaveCluster()
467 def perspective_node_volumes(params):
468 """Query the list of all logical volume groups.
471 return backend.NodeVolumes()
473 # cluster --------------------------
476 def perspective_version(params):
477 """Query version information.
480 return constants.PROTOCOL_VERSION
483 def perspective_upload_file(params):
486 Note that the backend implementation imposes strict rules on which
490 return backend.UploadFile(*params)
493 def perspective_master_info(params):
494 """Query master information.
497 return backend.GetMasterInfo()
500 def perspective_write_ssconf_files(params):
501 """Write ssconf files.
505 return backend.WriteSsconfFiles(values)
507 # os -----------------------
510 def perspective_os_diagnose(params):
511 """Query detailed information about existing OSes.
514 return [os.ToDict() for os in backend.DiagnoseOS()]
517 def perspective_os_get(params):
518 """Query information about a given OS.
523 os_obj = backend.OSFromDisk(name)
524 except errors.InvalidOS, err:
525 os_obj = objects.OS.FromInvalidOS(err)
526 return os_obj.ToDict()
528 # hooks -----------------------
531 def perspective_hooks_runner(params):
535 hpath, phase, env = params
536 hr = backend.HooksRunner()
537 return hr.RunHooks(hpath, phase, env)
539 # iallocator -----------------
542 def perspective_iallocator_runner(params):
543 """Run an iallocator script.
547 iar = backend.IAllocatorRunner()
548 return iar.Run(name, idata)
550 # test -----------------------
553 def perspective_test_delay(params):
558 return utils.TestDelay(duration)
560 # file storage ---------------
563 def perspective_file_storage_dir_create(params):
564 """Create the file storage directory.
567 file_storage_dir = params[0]
568 return backend.CreateFileStorageDir(file_storage_dir)
571 def perspective_file_storage_dir_remove(params):
572 """Remove the file storage directory.
575 file_storage_dir = params[0]
576 return backend.RemoveFileStorageDir(file_storage_dir)
579 def perspective_file_storage_dir_rename(params):
580 """Rename the file storage directory.
583 old_file_storage_dir = params[0]
584 new_file_storage_dir = params[1]
585 return backend.RenameFileStorageDir(old_file_storage_dir,
586 new_file_storage_dir)
588 # jobs ------------------------
591 @_RequireJobQueueLock
592 def perspective_jobqueue_update(params):
596 (file_name, content) = params
597 return backend.JobQueueUpdate(file_name, content)
600 @_RequireJobQueueLock
601 def perspective_jobqueue_purge(params):
605 return backend.JobQueuePurge()
608 @_RequireJobQueueLock
609 def perspective_jobqueue_rename(params):
610 """Rename a job queue file.
615 return backend.JobQueueRename(old, new)
618 def perspective_jobqueue_set_drain(params):
619 """Set/unset the queue drain flag.
622 drain_flag = params[0]
623 return backend.JobQueueSetDrainFlag(drain_flag)
626 # hypervisor ---------------
629 def perspective_hypervisor_validate_params(params):
630 """Validate the hypervisor parameters.
633 (hvname, hvparams) = params
634 return backend.ValidateHVParams(hvname, hvparams)
638 """Parse the command line options.
641 (options, args) as from OptionParser.parse_args()
644 parser = OptionParser(description="Ganeti node daemon",
645 usage="%prog [-f] [-d]",
646 version="%%prog (ganeti) %s" %
647 constants.RELEASE_VERSION)
649 parser.add_option("-f", "--foreground", dest="fork",
650 help="Don't detach from the current terminal",
651 default=True, action="store_false")
652 parser.add_option("-d", "--debug", dest="debug",
653 help="Enable some debug messages",
654 default=False, action="store_true")
655 options, args = parser.parse_args()
659 def EnsureRuntimeEnvironment():
660 """Ensure our run-time environment is complete.
662 Currently this creates directories which could be missing, either
663 due to directories being on a tmpfs mount, or due to incomplete
667 dirs = [(val, 0755) for val in constants.SUB_RUN_DIRS]
668 dirs.append((constants.LOG_OS_DIR, 0750))
669 for dir_name, dir_mode in dirs:
670 if not os.path.exists(dir_name):
672 os.mkdir(dir_name, dir_mode)
673 except EnvironmentError, err:
674 if err.errno != errno.EEXIST:
675 print ("Node setup wrong, cannot create directory '%s': %s" %
678 if not os.path.isdir(dir_name):
679 print ("Node setup wrong, '%s' is not a directory" % dir_name)
684 """Main function for the node daemon.
689 options, args = ParseOptions()
690 utils.debug = options.debug
691 for fname in (constants.SSL_CERT_FILE,):
692 if not os.path.isfile(fname):
693 print "config %s not there, will not run." % fname
697 port = utils.GetNodeDaemonPort()
698 except errors.ConfigurationError, err:
699 print "Cluster configuration incomplete: '%s'" % str(err)
702 EnsureRuntimeEnvironment()
706 utils.Daemonize(logfile=constants.LOG_NODESERVER)
708 utils.WritePidFile(constants.NODED_PID)
710 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
711 stderr_logging=not options.fork)
712 logging.info("ganeti node daemon startup")
714 # Read SSL certificate
715 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
716 ssl_cert_path=constants.SSL_CERT_FILE)
719 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
721 mainloop = daemon.Mainloop()
722 server = NodeHttpServer(mainloop, "", port,
723 ssl_params=ssl_params, ssl_verify_peer=True)
730 utils.RemovePidFile(constants.NODED_PID)
733 if __name__ == '__main__':