Fork ganeti-noded
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33
34 from optparse import OptionParser
35
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import http
44 from ganeti import utils
45
46 _EXIT_GANETI_NODED = False
47
48
49 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
50   """The server implementation.
51
52   This class holds all methods exposed over the RPC interface.
53
54   """
55   def HandleRequest(self):
56     """Handle a request.
57
58     """
59     global _EXIT_GANETI_NODED
60
61     if self.command.upper() != "PUT":
62       raise http.HTTPBadRequest()
63
64     path = self.path
65     if path.startswith("/"):
66       path = path[1:]
67
68     method = getattr(self, "perspective_%s" % path, None)
69     if method is None:
70       raise httperror.HTTPNotFound()
71
72     try:
73       return method(self.post_data)
74     except errors.QuitGanetiException, err:
75       _EXIT_GANETI_NODED = True
76
77   # the new block devices  --------------------------
78
79   @staticmethod
80   def perspective_blockdev_create(params):
81     """Create a block device.
82
83     """
84     bdev_s, size, owner, on_primary, info = params
85     bdev = objects.Disk.FromDict(bdev_s)
86     if bdev is None:
87       raise ValueError("can't unserialize data!")
88     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
89
90   @staticmethod
91   def perspective_blockdev_remove(params):
92     """Remove a block device.
93
94     """
95     bdev_s = params[0]
96     bdev = objects.Disk.FromDict(bdev_s)
97     return backend.RemoveBlockDevice(bdev)
98
99   @staticmethod
100   def perspective_blockdev_rename(params):
101     """Remove a block device.
102
103     """
104     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
105     return backend.RenameBlockDevices(devlist)
106
107   @staticmethod
108   def perspective_blockdev_assemble(params):
109     """Assemble a block device.
110
111     """
112     bdev_s, owner, on_primary = params
113     bdev = objects.Disk.FromDict(bdev_s)
114     if bdev is None:
115       raise ValueError("can't unserialize data!")
116     return backend.AssembleBlockDevice(bdev, owner, on_primary)
117
118   @staticmethod
119   def perspective_blockdev_shutdown(params):
120     """Shutdown a block device.
121
122     """
123     bdev_s = params[0]
124     bdev = objects.Disk.FromDict(bdev_s)
125     if bdev is None:
126       raise ValueError("can't unserialize data!")
127     return backend.ShutdownBlockDevice(bdev)
128
129   @staticmethod
130   def perspective_blockdev_addchildren(params):
131     """Add a child to a mirror device.
132
133     Note: this is only valid for mirror devices. It's the caller's duty
134     to send a correct disk, otherwise we raise an error.
135
136     """
137     bdev_s, ndev_s = params
138     bdev = objects.Disk.FromDict(bdev_s)
139     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
140     if bdev is None or ndevs.count(None) > 0:
141       raise ValueError("can't unserialize data!")
142     return backend.MirrorAddChildren(bdev, ndevs)
143
144   @staticmethod
145   def perspective_blockdev_removechildren(params):
146     """Remove a child from a mirror device.
147
148     This is only valid for mirror devices, of course. It's the callers
149     duty to send a correct disk, otherwise we raise an error.
150
151     """
152     bdev_s, ndev_s = params
153     bdev = objects.Disk.FromDict(bdev_s)
154     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
155     if bdev is None or ndevs.count(None) > 0:
156       raise ValueError("can't unserialize data!")
157     return backend.MirrorRemoveChildren(bdev, ndevs)
158
159   @staticmethod
160   def perspective_blockdev_getmirrorstatus(params):
161     """Return the mirror status for a list of disks.
162
163     """
164     disks = [objects.Disk.FromDict(dsk_s)
165             for dsk_s in params]
166     return backend.GetMirrorStatus(disks)
167
168   @staticmethod
169   def perspective_blockdev_find(params):
170     """Expose the FindBlockDevice functionality for a disk.
171
172     This will try to find but not activate a disk.
173
174     """
175     disk = objects.Disk.FromDict(params[0])
176     return backend.FindBlockDevice(disk)
177
178   @staticmethod
179   def perspective_blockdev_snapshot(params):
180     """Create a snapshot device.
181
182     Note that this is only valid for LVM disks, if we get passed
183     something else we raise an exception. The snapshot device can be
184     remove by calling the generic block device remove call.
185
186     """
187     cfbd = objects.Disk.FromDict(params[0])
188     return backend.SnapshotBlockDevice(cfbd)
189
190   @staticmethod
191   def perspective_blockdev_grow(params):
192     """Grow a stack of devices.
193
194     """
195     cfbd = objects.Disk.FromDict(params[0])
196     amount = params[1]
197     return backend.GrowBlockDevice(cfbd, amount)
198
199   @staticmethod
200   def perspective_blockdev_close(params):
201     """Closes the given block devices.
202
203     """
204     disks = [objects.Disk.FromDict(cf) for cf in params]
205     return backend.CloseBlockDevices(disks)
206
207   # export/import  --------------------------
208
209   @staticmethod
210   def perspective_snapshot_export(params):
211     """Export a given snapshot.
212
213     """
214     disk = objects.Disk.FromDict(params[0])
215     dest_node = params[1]
216     instance = objects.Instance.FromDict(params[2])
217     return backend.ExportSnapshot(disk, dest_node, instance)
218
219   @staticmethod
220   def perspective_finalize_export(params):
221     """Expose the finalize export functionality.
222
223     """
224     instance = objects.Instance.FromDict(params[0])
225     snap_disks = [objects.Disk.FromDict(str_data)
226                   for str_data in params[1]]
227     return backend.FinalizeExport(instance, snap_disks)
228
229   @staticmethod
230   def perspective_export_info(params):
231     """Query information about an existing export on this node.
232
233     The given path may not contain an export, in which case we return
234     None.
235
236     """
237     path = params[0]
238     einfo = backend.ExportInfo(path)
239     if einfo is None:
240       return einfo
241     return einfo.Dumps()
242
243   @staticmethod
244   def perspective_export_list(params):
245     """List the available exports on this node.
246
247     Note that as opposed to export_info, which may query data about an
248     export in any path, this only queries the standard Ganeti path
249     (constants.EXPORT_DIR).
250
251     """
252     return backend.ListExports()
253
254   @staticmethod
255   def perspective_export_remove(params):
256     """Remove an export.
257
258     """
259     export = params[0]
260     return backend.RemoveExport(export)
261
262   # volume  --------------------------
263
264   @staticmethod
265   def perspective_volume_list(params):
266     """Query the list of logical volumes in a given volume group.
267
268     """
269     vgname = params[0]
270     return backend.GetVolumeList(vgname)
271
272   @staticmethod
273   def perspective_vg_list(params):
274     """Query the list of volume groups.
275
276     """
277     return backend.ListVolumeGroups()
278
279   # bridge  --------------------------
280
281   @staticmethod
282   def perspective_bridges_exist(params):
283     """Check if all bridges given exist on this node.
284
285     """
286     bridges_list = params[0]
287     return backend.BridgesExist(bridges_list)
288
289   # instance  --------------------------
290
291   @staticmethod
292   def perspective_instance_os_add(params):
293     """Install an OS on a given instance.
294
295     """
296     inst_s, os_disk, swap_disk = params
297     inst = objects.Instance.FromDict(inst_s)
298     return backend.AddOSToInstance(inst, os_disk, swap_disk)
299
300   @staticmethod
301   def perspective_instance_run_rename(params):
302     """Runs the OS rename script for an instance.
303
304     """
305     inst_s, old_name, os_disk, swap_disk = params
306     inst = objects.Instance.FromDict(inst_s)
307     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
308
309   @staticmethod
310   def perspective_instance_os_import(params):
311     """Run the import function of an OS onto a given instance.
312
313     """
314     inst_s, os_disk, swap_disk, src_node, src_image = params
315     inst = objects.Instance.FromDict(inst_s)
316     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
317                                         src_node, src_image)
318
319   @staticmethod
320   def perspective_instance_shutdown(params):
321     """Shutdown an instance.
322
323     """
324     instance = objects.Instance.FromDict(params[0])
325     return backend.ShutdownInstance(instance)
326
327   @staticmethod
328   def perspective_instance_start(params):
329     """Start an instance.
330
331     """
332     instance = objects.Instance.FromDict(params[0])
333     extra_args = params[1]
334     return backend.StartInstance(instance, extra_args)
335
336   @staticmethod
337   def perspective_instance_migrate(params):
338     """Migrates an instance.
339
340     """
341     instance, target, live = params
342     return backend.MigrateInstance(instance, target, live)
343
344   @staticmethod
345   def perspective_instance_reboot(params):
346     """Reboot an instance.
347
348     """
349     instance = objects.Instance.FromDict(params[0])
350     reboot_type = params[1]
351     extra_args = params[2]
352     return backend.RebootInstance(instance, reboot_type, extra_args)
353
354   @staticmethod
355   def perspective_instance_info(params):
356     """Query instance information.
357
358     """
359     return backend.GetInstanceInfo(params[0])
360
361   @staticmethod
362   def perspective_all_instances_info(params):
363     """Query information about all instances.
364
365     """
366     return backend.GetAllInstancesInfo()
367
368   @staticmethod
369   def perspective_instance_list(params):
370     """Query the list of running instances.
371
372     """
373     return backend.GetInstanceList()
374
375   # node --------------------------
376
377   @staticmethod
378   def perspective_node_tcp_ping(params):
379     """Do a TcpPing on the remote node.
380
381     """
382     return utils.TcpPing(params[1], params[2], timeout=params[3],
383                          live_port_needed=params[4], source=params[0])
384
385   @staticmethod
386   def perspective_node_info(params):
387     """Query node information.
388
389     """
390     vgname = params[0]
391     return backend.GetNodeInfo(vgname)
392
393   @staticmethod
394   def perspective_node_add(params):
395     """Complete the registration of this node in the cluster.
396
397     """
398     return backend.AddNode(params[0], params[1], params[2],
399                            params[3], params[4], params[5])
400
401   @staticmethod
402   def perspective_node_verify(params):
403     """Run a verify sequence on this node.
404
405     """
406     return backend.VerifyNode(params[0])
407
408   @staticmethod
409   def perspective_node_start_master(params):
410     """Promote this node to master status.
411
412     """
413     return backend.StartMaster()
414
415   @staticmethod
416   def perspective_node_stop_master(params):
417     """Demote this node from master status.
418
419     """
420     return backend.StopMaster()
421
422   @staticmethod
423   def perspective_node_leave_cluster(params):
424     """Cleanup after leaving a cluster.
425
426     """
427     return backend.LeaveCluster()
428
429   @staticmethod
430   def perspective_node_volumes(params):
431     """Query the list of all logical volume groups.
432
433     """
434     return backend.NodeVolumes()
435
436   # cluster --------------------------
437
438   @staticmethod
439   def perspective_version(params):
440     """Query version information.
441
442     """
443     return constants.PROTOCOL_VERSION
444
445   @staticmethod
446   def perspective_upload_file(params):
447     """Upload a file.
448
449     Note that the backend implementation imposes strict rules on which
450     files are accepted.
451
452     """
453     return backend.UploadFile(*params)
454
455
456   # os -----------------------
457
458   @staticmethod
459   def perspective_os_diagnose(params):
460     """Query detailed information about existing OSes.
461
462     """
463     return [os.ToDict() for os in backend.DiagnoseOS()]
464
465   @staticmethod
466   def perspective_os_get(params):
467     """Query information about a given OS.
468
469     """
470     name = params[0]
471     try:
472       os_obj = backend.OSFromDisk(name)
473     except errors.InvalidOS, err:
474       os_obj = objects.OS.FromInvalidOS(err)
475     return os_obj.ToDict()
476
477   # hooks -----------------------
478
479   @staticmethod
480   def perspective_hooks_runner(params):
481     """Run hook scripts.
482
483     """
484     hpath, phase, env = params
485     hr = backend.HooksRunner()
486     return hr.RunHooks(hpath, phase, env)
487
488   # iallocator -----------------
489
490   @staticmethod
491   def perspective_iallocator_runner(params):
492     """Run an iallocator script.
493
494     """
495     name, idata = params
496     iar = backend.IAllocatorRunner()
497     return iar.Run(name, idata)
498
499   # test -----------------------
500
501   @staticmethod
502   def perspective_test_delay(params):
503     """Run test delay.
504
505     """
506     duration = params[0]
507     return utils.TestDelay(duration)
508
509   @staticmethod
510   def perspective_file_storage_dir_create(params):
511     """Create the file storage directory.
512
513     """
514     file_storage_dir = params[0]
515     return backend.CreateFileStorageDir(file_storage_dir)
516
517   @staticmethod
518   def perspective_file_storage_dir_remove(params):
519     """Remove the file storage directory.
520
521     """
522     file_storage_dir = params[0]
523     return backend.RemoveFileStorageDir(file_storage_dir)
524
525   @staticmethod
526   def perspective_file_storage_dir_rename(params):
527     """Rename the file storage directory.
528
529     """
530     old_file_storage_dir = params[0]
531     new_file_storage_dir = params[1]
532     return backend.RenameFileStorageDir(old_file_storage_dir,
533                                         new_file_storage_dir)
534
535
536 class NodeDaemonHttpServer(http.HTTPServer):
537   def __init__(self, server_address):
538     http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
539
540
541 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
542   """Forking HTTP Server.
543
544   This inherits from ForkingMixIn and HTTPServer in order to fork for each
545   request we handle. This allows more requests to be handled concurrently.
546
547   """
548
549
550 def ParseOptions():
551   """Parse the command line options.
552
553   Returns:
554     (options, args) as from OptionParser.parse_args()
555
556   """
557   parser = OptionParser(description="Ganeti node daemon",
558                         usage="%prog [-f] [-d]",
559                         version="%%prog (ganeti) %s" %
560                         constants.RELEASE_VERSION)
561
562   parser.add_option("-f", "--foreground", dest="fork",
563                     help="Don't detach from the current terminal",
564                     default=True, action="store_false")
565   parser.add_option("-d", "--debug", dest="debug",
566                     help="Enable some debug messages",
567                     default=False, action="store_true")
568   options, args = parser.parse_args()
569   return options, args
570
571
572 def main():
573   """Main function for the node daemon.
574
575   """
576   options, args = ParseOptions()
577   utils.debug = options.debug
578   for fname in (constants.SSL_CERT_FILE,):
579     if not os.path.isfile(fname):
580       print "config %s not there, will not run." % fname
581       sys.exit(5)
582
583   try:
584     ss = ssconf.SimpleStore()
585     port = ss.GetNodeDaemonPort()
586     pwdata = ss.GetNodeDaemonPassword()
587   except errors.ConfigurationError, err:
588     print "Cluster configuration incomplete: '%s'" % str(err)
589     sys.exit(5)
590
591   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
592   # situation where RUN_DIR is tmpfs
593   for dir_name in constants.SUB_RUN_DIRS:
594     if not os.path.exists(dir_name):
595       try:
596         os.mkdir(dir_name, 0755)
597       except EnvironmentError, err:
598         if err.errno != errno.EEXIST:
599           print ("Node setup wrong, cannot create directory %s: %s" %
600                  (dir_name, err))
601           sys.exit(5)
602     if not os.path.isdir(dir_name):
603       print ("Node setup wrong, %s is not a directory" % dir_name)
604       sys.exit(5)
605
606   # become a daemon
607   if options.fork:
608     utils.Daemonize(logfile=constants.LOG_NODESERVER)
609
610   logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
611                      stderr_logging=not options.fork)
612   logging.info("ganeti node daemon startup")
613
614   global _EXIT_GANETI_NODED
615
616   if options.fork:
617     httpd = ForkingHTTPServer(('', port))
618   else:
619     httpd = NodeDaemonHttpServer(('', port))
620
621   # FIXME: updating _EXIT_GANETI_NODED doesn't work when forking
622   while (not _EXIT_GANETI_NODED):
623     httpd.handle_request()
624
625
626 if __name__ == '__main__':
627   main()