4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
34 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import utils
46 class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47 """The server implementation.
49 This class holds all methods exposed over the RPC interface.
53 """Handle a post request.
57 if path.startswith("/"):
59 mname = "perspective_%s" % path
60 if not hasattr(self, mname):
64 method = getattr(self, mname)
66 body_length = int(self.headers.get('Content-Length', '0'))
68 self.send_error(400, 'No Content-Length header or invalid format')
72 body = self.rfile.read(body_length)
73 except socket.error, err:
74 logger.Error("Socket error while reading: %s" % str(err))
77 params = simplejson.loads(body)
78 result = method(params)
79 payload = simplejson.dumps(result)
80 except errors.QuitGanetiException, err:
81 global _EXIT_GANETI_NODED
82 _EXIT_GANETI_NODED = True
83 if isinstance(err, tuple) and len(err) == 2:
85 self.send_error(500, "Error: %s" % str(err[1]))
87 payload = simplejson.dumps(err[1])
89 self.log_message('GanetiQuitException Usage Error')
90 self.send_error(500, "Error: %s" % str(err))
91 except Exception, err:
92 self.send_error(500, "Error: %s" % str(err))
94 self.send_response(200)
95 self.send_header('Content-Length', str(len(payload)))
97 self.wfile.write(payload)
100 def log_message(self, format, *args):
101 """Log a request to the log.
103 This is the same as the parent, we just log somewhere else.
106 msg = ("%s - - [%s] %s" %
107 (self.address_string(),
108 self.log_date_time_string(),
112 # the new block devices --------------------------
115 def perspective_blockdev_create(params):
116 """Create a block device.
119 bdev_s, size, owner, on_primary, info = params
120 bdev = objects.Disk.FromDict(bdev_s)
122 raise ValueError("can't unserialize data!")
123 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
126 def perspective_blockdev_remove(params):
127 """Remove a block device.
131 bdev = objects.Disk.FromDict(bdev_s)
132 return backend.RemoveBlockDevice(bdev)
135 def perspective_blockdev_rename(params):
136 """Remove a block device.
139 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
140 return backend.RenameBlockDevices(devlist)
143 def perspective_blockdev_assemble(params):
144 """Assemble a block device.
147 bdev_s, owner, on_primary = params
148 bdev = objects.Disk.FromDict(bdev_s)
150 raise ValueError("can't unserialize data!")
151 return backend.AssembleBlockDevice(bdev, owner, on_primary)
154 def perspective_blockdev_shutdown(params):
155 """Shutdown a block device.
159 bdev = objects.Disk.FromDict(bdev_s)
161 raise ValueError("can't unserialize data!")
162 return backend.ShutdownBlockDevice(bdev)
165 def perspective_blockdev_addchildren(params):
166 """Add a child to a mirror device.
168 Note: this is only valid for mirror devices. It's the caller's duty
169 to send a correct disk, otherwise we raise an error.
172 bdev_s, ndev_s = params
173 bdev = objects.Disk.FromDict(bdev_s)
174 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
175 if bdev is None or ndevs.count(None) > 0:
176 raise ValueError("can't unserialize data!")
177 return backend.MirrorAddChildren(bdev, ndevs)
180 def perspective_blockdev_removechildren(params):
181 """Remove a child from a mirror device.
183 This is only valid for mirror devices, of course. It's the callers
184 duty to send a correct disk, otherwise we raise an error.
187 bdev_s, ndev_s = params
188 bdev = objects.Disk.FromDict(bdev_s)
189 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
190 if bdev is None or ndevs.count(None) > 0:
191 raise ValueError("can't unserialize data!")
192 return backend.MirrorRemoveChildren(bdev, ndevs)
195 def perspective_blockdev_getmirrorstatus(params):
196 """Return the mirror status for a list of disks.
199 disks = [objects.Disk.FromDict(dsk_s)
201 return backend.GetMirrorStatus(disks)
204 def perspective_blockdev_find(params):
205 """Expose the FindBlockDevice functionality for a disk.
207 This will try to find but not activate a disk.
210 disk = objects.Disk.FromDict(params[0])
211 return backend.FindBlockDevice(disk)
214 def perspective_blockdev_snapshot(params):
215 """Create a snapshot device.
217 Note that this is only valid for LVM disks, if we get passed
218 something else we raise an exception. The snapshot device can be
219 remove by calling the generic block device remove call.
222 cfbd = objects.Disk.FromDict(params[0])
223 return backend.SnapshotBlockDevice(cfbd)
226 def perspective_blockdev_grow(params):
227 """Grow a stack of devices.
230 cfbd = objects.Disk.FromDict(params[0])
232 return backend.GrowBlockDevice(cfbd, amount)
235 def perspective_blockdev_close(params):
236 """Closes the given block devices.
239 disks = [objects.Disk.FromDict(cf) for cf in params]
240 return backend.CloseBlockDevices(disks)
242 # export/import --------------------------
245 def perspective_snapshot_export(params):
246 """Export a given snapshot.
249 disk = objects.Disk.FromDict(params[0])
250 dest_node = params[1]
251 instance = objects.Instance.FromDict(params[2])
252 return backend.ExportSnapshot(disk, dest_node, instance)
255 def perspective_finalize_export(params):
256 """Expose the finalize export functionality.
259 instance = objects.Instance.FromDict(params[0])
260 snap_disks = [objects.Disk.FromDict(str_data)
261 for str_data in params[1]]
262 return backend.FinalizeExport(instance, snap_disks)
265 def perspective_export_info(params):
266 """Query information about an existing export on this node.
268 The given path may not contain an export, in which case we return
273 einfo = backend.ExportInfo(path)
279 def perspective_export_list(params):
280 """List the available exports on this node.
282 Note that as opposed to export_info, which may query data about an
283 export in any path, this only queries the standard Ganeti path
284 (constants.EXPORT_DIR).
287 return backend.ListExports()
290 def perspective_export_remove(params):
295 return backend.RemoveExport(export)
297 # volume --------------------------
300 def perspective_volume_list(params):
301 """Query the list of logical volumes in a given volume group.
305 return backend.GetVolumeList(vgname)
308 def perspective_vg_list(params):
309 """Query the list of volume groups.
312 return backend.ListVolumeGroups()
314 # bridge --------------------------
317 def perspective_bridges_exist(params):
318 """Check if all bridges given exist on this node.
321 bridges_list = params[0]
322 return backend.BridgesExist(bridges_list)
324 # instance --------------------------
327 def perspective_instance_os_add(params):
328 """Install an OS on a given instance.
331 inst_s, os_disk, swap_disk = params
332 inst = objects.Instance.FromDict(inst_s)
333 return backend.AddOSToInstance(inst, os_disk, swap_disk)
336 def perspective_instance_run_rename(params):
337 """Runs the OS rename script for an instance.
340 inst_s, old_name, os_disk, swap_disk = params
341 inst = objects.Instance.FromDict(inst_s)
342 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
345 def perspective_instance_os_import(params):
346 """Run the import function of an OS onto a given instance.
349 inst_s, os_disk, swap_disk, src_node, src_image = params
350 inst = objects.Instance.FromDict(inst_s)
351 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
355 def perspective_instance_shutdown(params):
356 """Shutdown an instance.
359 instance = objects.Instance.FromDict(params[0])
360 return backend.ShutdownInstance(instance)
363 def perspective_instance_start(params):
364 """Start an instance.
367 instance = objects.Instance.FromDict(params[0])
368 extra_args = params[1]
369 return backend.StartInstance(instance, extra_args)
372 def perspective_instance_migrate(params):
373 """Migrates an instance.
376 instance, target, live = params
377 return backend.MigrateInstance(instance, target, live)
380 def perspective_instance_reboot(params):
381 """Reboot an instance.
384 instance = objects.Instance.FromDict(params[0])
385 reboot_type = params[1]
386 extra_args = params[2]
387 return backend.RebootInstance(instance, reboot_type, extra_args)
390 def perspective_instance_info(params):
391 """Query instance information.
394 return backend.GetInstanceInfo(params[0])
397 def perspective_all_instances_info(params):
398 """Query information about all instances.
401 return backend.GetAllInstancesInfo()
404 def perspective_instance_list(params):
405 """Query the list of running instances.
408 return backend.GetInstanceList()
410 # node --------------------------
413 def perspective_node_tcp_ping(params):
414 """Do a TcpPing on the remote node.
417 return utils.TcpPing(params[1], params[2], timeout=params[3],
418 live_port_needed=params[4], source=params[0])
421 def perspective_node_info(params):
422 """Query node information.
426 return backend.GetNodeInfo(vgname)
429 def perspective_node_add(params):
430 """Complete the registration of this node in the cluster.
433 return backend.AddNode(params[0], params[1], params[2],
434 params[3], params[4], params[5])
437 def perspective_node_verify(params):
438 """Run a verify sequence on this node.
441 return backend.VerifyNode(params[0])
444 def perspective_node_start_master(params):
445 """Promote this node to master status.
448 return backend.StartMaster()
451 def perspective_node_stop_master(params):
452 """Demote this node from master status.
455 return backend.StopMaster()
458 def perspective_node_leave_cluster(params):
459 """Cleanup after leaving a cluster.
462 return backend.LeaveCluster()
465 def perspective_node_volumes(params):
466 """Query the list of all logical volume groups.
469 return backend.NodeVolumes()
471 # cluster --------------------------
474 def perspective_version(params):
475 """Query version information.
478 return constants.PROTOCOL_VERSION
481 def perspective_upload_file(params):
484 Note that the backend implementation imposes strict rules on which
488 return backend.UploadFile(*params)
491 # os -----------------------
494 def perspective_os_diagnose(params):
495 """Query detailed information about existing OSes.
498 return [os.ToDict() for os in backend.DiagnoseOS()]
501 def perspective_os_get(params):
502 """Query information about a given OS.
507 os_obj = backend.OSFromDisk(name)
508 except errors.InvalidOS, err:
509 os_obj = objects.OS.FromInvalidOS(err)
510 return os_obj.ToDict()
512 # hooks -----------------------
515 def perspective_hooks_runner(params):
519 hpath, phase, env = params
520 hr = backend.HooksRunner()
521 return hr.RunHooks(hpath, phase, env)
523 # iallocator -----------------
526 def perspective_iallocator_runner(params):
527 """Run an iallocator script.
531 iar = backend.IAllocatorRunner()
532 return iar.Run(name, idata)
534 # test -----------------------
537 def perspective_test_delay(params):
542 return utils.TestDelay(duration)
545 def perspective_file_storage_dir_create(params):
546 """Create the file storage directory.
549 file_storage_dir = params[0]
550 return backend.CreateFileStorageDir(file_storage_dir)
553 def perspective_file_storage_dir_remove(params):
554 """Remove the file storage directory.
557 file_storage_dir = params[0]
558 return backend.RemoveFileStorageDir(file_storage_dir)
561 def perspective_file_storage_dir_rename(params):
562 """Rename the file storage directory.
565 old_file_storage_dir = params[0]
566 new_file_storage_dir = params[1]
567 return backend.RenameFileStorageDir(old_file_storage_dir,
568 new_file_storage_dir)
572 """Parse the command line options.
575 (options, args) as from OptionParser.parse_args()
578 parser = OptionParser(description="Ganeti node daemon",
579 usage="%prog [-f] [-d]",
580 version="%%prog (ganeti) %s" %
581 constants.RELEASE_VERSION)
583 parser.add_option("-f", "--foreground", dest="fork",
584 help="Don't detach from the current terminal",
585 default=True, action="store_false")
586 parser.add_option("-d", "--debug", dest="debug",
587 help="Enable some debug messages",
588 default=False, action="store_true")
589 options, args = parser.parse_args()
594 """Main function for the node daemon.
597 options, args = ParseOptions()
598 utils.debug = options.debug
599 for fname in (constants.SSL_CERT_FILE,):
600 if not os.path.isfile(fname):
601 print "config %s not there, will not run." % fname
605 ss = ssconf.SimpleStore()
606 port = ss.GetNodeDaemonPort()
607 pwdata = ss.GetNodeDaemonPassword()
608 except errors.ConfigurationError, err:
609 print "Cluster configuration incomplete: '%s'" % str(err)
612 # create /var/run/ganeti if not existing, in order to take care of
614 if not os.path.exists(constants.BDEV_CACHE_DIR):
616 os.mkdir(constants.BDEV_CACHE_DIR, 0755)
617 except EnvironmentError, err:
618 if err.errno != errno.EEXIST:
619 print ("Node setup wrong, cannot create directory %s: %s" %
620 (constants.BDEV_CACHE_DIR, err))
622 if not os.path.isdir(constants.BDEV_CACHE_DIR):
623 print ("Node setup wrong, %s is not a directory" %
624 constants.BDEV_CACHE_DIR)
629 utils.Daemonize(logfile=constants.LOG_NODESERVER)
631 logger.SetupLogging(program="ganeti-noded", debug=options.debug)
633 global _EXIT_GANETI_NODED
634 _EXIT_GANETI_NODED = False
636 httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
637 while (not _EXIT_GANETI_NODED):
638 httpd.handle_request()
641 if __name__ == '__main__':