4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
34 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import utils
46 class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47 """The server implementation.
49 This class holds all methods exposed over the RPC interface.
53 """Handle a post request.
57 if path.startswith("/"):
59 mname = "perspective_%s" % path
60 if not hasattr(self, mname):
64 method = getattr(self, mname)
66 body_length = int(self.headers.get('Content-Length', '0'))
68 self.send_error(400, 'No Content-Length header or invalid format')
72 body = self.rfile.read(body_length)
73 except socket.error, err:
74 logger.Error("Socket error while reading: %s" % str(err))
77 params = simplejson.loads(body)
78 result = method(params)
79 payload = simplejson.dumps(result)
80 except Exception, err:
81 self.send_error(500, "Error: %s" % str(err))
83 self.send_response(200)
84 self.send_header('Content-Length', str(len(payload)))
86 self.wfile.write(payload)
89 def log_message(self, format, *args):
90 """Log a request to the log.
92 This is the same as the parent, we just log somewhere else.
95 msg = ("%s - - [%s] %s" %
96 (self.address_string(),
97 self.log_date_time_string(),
101 # the new block devices --------------------------
104 def perspective_blockdev_create(params):
105 """Create a block device.
108 bdev_s, size, owner, on_primary, info = params
109 bdev = objects.Disk.FromDict(bdev_s)
111 raise ValueError("can't unserialize data!")
112 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115 def perspective_blockdev_remove(params):
116 """Remove a block device.
120 bdev = objects.Disk.FromDict(bdev_s)
121 return backend.RemoveBlockDevice(bdev)
124 def perspective_blockdev_rename(params):
125 """Remove a block device.
128 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
129 return backend.RenameBlockDevices(devlist)
132 def perspective_blockdev_assemble(params):
133 """Assemble a block device.
136 bdev_s, owner, on_primary = params
137 bdev = objects.Disk.FromDict(bdev_s)
139 raise ValueError("can't unserialize data!")
140 return backend.AssembleBlockDevice(bdev, owner, on_primary)
143 def perspective_blockdev_shutdown(params):
144 """Shutdown a block device.
148 bdev = objects.Disk.FromDict(bdev_s)
150 raise ValueError("can't unserialize data!")
151 return backend.ShutdownBlockDevice(bdev)
154 def perspective_blockdev_addchildren(params):
155 """Add a child to a mirror device.
157 Note: this is only valid for mirror devices. It's the caller's duty
158 to send a correct disk, otherwise we raise an error.
161 bdev_s, ndev_s = params
162 bdev = objects.Disk.FromDict(bdev_s)
163 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
164 if bdev is None or ndevs.count(None) > 0:
165 raise ValueError("can't unserialize data!")
166 return backend.MirrorAddChildren(bdev, ndevs)
169 def perspective_blockdev_removechildren(params):
170 """Remove a child from a mirror device.
172 This is only valid for mirror devices, of course. It's the callers
173 duty to send a correct disk, otherwise we raise an error.
176 bdev_s, ndev_s = params
177 bdev = objects.Disk.FromDict(bdev_s)
178 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
179 if bdev is None or ndevs.count(None) > 0:
180 raise ValueError("can't unserialize data!")
181 return backend.MirrorRemoveChildren(bdev, ndevs)
184 def perspective_blockdev_getmirrorstatus(params):
185 """Return the mirror status for a list of disks.
188 disks = [objects.Disk.FromDict(dsk_s)
190 return backend.GetMirrorStatus(disks)
193 def perspective_blockdev_find(params):
194 """Expose the FindBlockDevice functionality for a disk.
196 This will try to find but not activate a disk.
199 disk = objects.Disk.FromDict(params[0])
200 return backend.FindBlockDevice(disk)
203 def perspective_blockdev_snapshot(params):
204 """Create a snapshot device.
206 Note that this is only valid for LVM disks, if we get passed
207 something else we raise an exception. The snapshot device can be
208 remove by calling the generic block device remove call.
211 cfbd = objects.Disk.FromDict(params[0])
212 return backend.SnapshotBlockDevice(cfbd)
214 # export/import --------------------------
217 def perspective_snapshot_export(params):
218 """Export a given snapshot.
221 disk = objects.Disk.FromDict(params[0])
222 dest_node = params[1]
223 instance = objects.Instance.FromDict(params[2])
224 return backend.ExportSnapshot(disk, dest_node, instance)
227 def perspective_finalize_export(params):
228 """Expose the finalize export functionality.
231 instance = objects.Instance.FromDict(params[0])
232 snap_disks = [objects.Disk.FromDict(str_data)
233 for str_data in params[1]]
234 return backend.FinalizeExport(instance, snap_disks)
237 def perspective_export_info(params):
238 """Query information about an existing export on this node.
240 The given path may not contain an export, in which case we return
245 einfo = backend.ExportInfo(path)
251 def perspective_export_list(params):
252 """List the available exports on this node.
254 Note that as opposed to export_info, which may query data about an
255 export in any path, this only queries the standard Ganeti path
256 (constants.EXPORT_DIR).
259 return backend.ListExports()
262 def perspective_export_remove(params):
267 return backend.RemoveExport(export)
269 # volume --------------------------
272 def perspective_volume_list(params):
273 """Query the list of logical volumes in a given volume group.
277 return backend.GetVolumeList(vgname)
280 def perspective_vg_list(params):
281 """Query the list of volume groups.
284 return backend.ListVolumeGroups()
286 # bridge --------------------------
289 def perspective_bridges_exist(params):
290 """Check if all bridges given exist on this node.
293 bridges_list = params[0]
294 return backend.BridgesExist(bridges_list)
296 # instance --------------------------
299 def perspective_instance_os_add(params):
300 """Install an OS on a given instance.
303 inst_s, os_disk, swap_disk = params
304 inst = objects.Instance.FromDict(inst_s)
305 return backend.AddOSToInstance(inst, os_disk, swap_disk)
308 def perspective_instance_run_rename(params):
309 """Runs the OS rename script for an instance.
312 inst_s, old_name, os_disk, swap_disk = params
313 inst = objects.Instance.FromDict(inst_s)
314 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
317 def perspective_instance_os_import(params):
318 """Run the import function of an OS onto a given instance.
321 inst_s, os_disk, swap_disk, src_node, src_image = params
322 inst = objects.Instance.FromDict(inst_s)
323 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
327 def perspective_instance_shutdown(params):
328 """Shutdown an instance.
331 instance = objects.Instance.FromDict(params[0])
332 return backend.ShutdownInstance(instance)
335 def perspective_instance_start(params):
336 """Start an instance.
339 instance = objects.Instance.FromDict(params[0])
340 extra_args = params[1]
341 return backend.StartInstance(instance, extra_args)
344 def perspective_instance_reboot(params):
345 """Reboot an instance.
348 instance = objects.Instance.FromDict(params[0])
349 reboot_type = params[1]
350 extra_args = params[2]
351 return backend.RebootInstance(instance, reboot_type, extra_args)
354 def perspective_instance_info(params):
355 """Query instance information.
358 return backend.GetInstanceInfo(params[0])
361 def perspective_all_instances_info(params):
362 """Query information about all instances.
365 return backend.GetAllInstancesInfo()
368 def perspective_instance_list(params):
369 """Query the list of running instances.
372 return backend.GetInstanceList()
374 # node --------------------------
377 def perspective_node_tcp_ping(params):
378 """Do a TcpPing on the remote node.
381 return utils.TcpPing(params[1], params[2], timeout=params[3],
382 live_port_needed=params[4], source=params[0])
385 def perspective_node_info(params):
386 """Query node information.
390 return backend.GetNodeInfo(vgname)
393 def perspective_node_add(params):
394 """Complete the registration of this node in the cluster.
397 return backend.AddNode(params[0], params[1], params[2],
398 params[3], params[4], params[5])
401 def perspective_node_verify(params):
402 """Run a verify sequence on this node.
405 return backend.VerifyNode(params[0])
408 def perspective_node_start_master(params):
409 """Promote this node to master status.
412 return backend.StartMaster()
415 def perspective_node_stop_master(params):
416 """Demote this node from master status.
419 return backend.StopMaster()
422 def perspective_node_leave_cluster(params):
423 """Cleanup after leaving a cluster.
426 return backend.LeaveCluster()
429 def perspective_node_volumes(params):
430 """Query the list of all logical volume groups.
433 return backend.NodeVolumes()
435 # cluster --------------------------
438 def perspective_version(params):
439 """Query version information.
442 return constants.PROTOCOL_VERSION
445 def perspective_upload_file(params):
448 Note that the backend implementation imposes strict rules on which
452 return backend.UploadFile(*params)
455 # os -----------------------
458 def perspective_os_diagnose(params):
459 """Query detailed information about existing OSes.
462 return [os.ToDict() for os in backend.DiagnoseOS()]
465 def perspective_os_get(params):
466 """Query information about a given OS.
471 os_obj = backend.OSFromDisk(name)
472 except errors.InvalidOS, err:
473 os_obj = objects.OS.FromInvalidOS(err)
474 return os_obj.ToDict()
476 # hooks -----------------------
479 def perspective_hooks_runner(params):
483 hpath, phase, env = params
484 hr = backend.HooksRunner()
485 return hr.RunHooks(hpath, phase, env)
487 # iallocator -----------------
490 def perspective_iallocator_runner(params):
491 """Run an iallocator script.
495 iar = backend.IAllocatorRunner()
496 return iar.Run(name, idata)
498 # test -----------------------
501 def perspective_test_delay(params):
506 return utils.TestDelay(duration)
509 def perspective_file_storage_dir_create(params):
510 """Create the file storage directory.
513 file_storage_dir = params[0]
514 return backend.CreateFileStorageDir(file_storage_dir)
517 def perspective_file_storage_dir_remove(params):
518 """Remove the file storage directory.
521 file_storage_dir = params[0]
522 return backend.RemoveFileStorageDir(file_storage_dir)
525 def perspective_file_storage_dir_rename(params):
526 """Rename the file storage directory.
529 old_file_storage_dir = params[0]
530 new_file_storage_dir = params[1]
531 return backend.RenameFileStorageDir(old_file_storage_dir,
532 new_file_storage_dir)
536 """Parse the command line options.
539 (options, args) as from OptionParser.parse_args()
542 parser = OptionParser(description="Ganeti node daemon",
543 usage="%prog [-f] [-d]",
544 version="%%prog (ganeti) %s" %
545 constants.RELEASE_VERSION)
547 parser.add_option("-f", "--foreground", dest="fork",
548 help="Don't detach from the current terminal",
549 default=True, action="store_false")
550 parser.add_option("-d", "--debug", dest="debug",
551 help="Enable some debug messages",
552 default=False, action="store_true")
553 options, args = parser.parse_args()
558 """Main function for the node daemon.
561 options, args = ParseOptions()
562 utils.debug = options.debug
563 for fname in (constants.SSL_CERT_FILE,):
564 if not os.path.isfile(fname):
565 print "config %s not there, will not run." % fname
569 ss = ssconf.SimpleStore()
570 port = ss.GetNodeDaemonPort()
571 pwdata = ss.GetNodeDaemonPassword()
572 except errors.ConfigurationError, err:
573 print "Cluster configuration incomplete: '%s'" % str(err)
576 # create /var/run/ganeti if not existing, in order to take care of
578 if not os.path.exists(constants.BDEV_CACHE_DIR):
580 os.mkdir(constants.BDEV_CACHE_DIR, 0755)
581 except EnvironmentError, err:
582 if err.errno != errno.EEXIST:
583 print ("Node setup wrong, cannot create directory %s: %s" %
584 (constants.BDEV_CACHE_DIR, err))
586 if not os.path.isdir(constants.BDEV_CACHE_DIR):
587 print ("Node setup wrong, %s is not a directory" %
588 constants.BDEV_CACHE_DIR)
593 utils.Daemonize(logfile=constants.LOG_NODESERVER)
595 logger.SetupLogging(twisted_workaround=True, debug=options.debug,
596 program="ganeti-noded")
598 httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
599 httpd.serve_forever()
602 if __name__ == '__main__':