Use constants for the pid file stems
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import http
44 from ganeti import utils
45
46
47 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
48   """The server implementation.
49
50   This class holds all methods exposed over the RPC interface.
51
52   """
53   def HandleRequest(self):
54     """Handle a request.
55
56     """
57     if self.command.upper() != "PUT":
58       raise http.HTTPBadRequest()
59
60     path = self.path
61     if path.startswith("/"):
62       path = path[1:]
63
64     method = getattr(self, "perspective_%s" % path, None)
65     if method is None:
66       raise httperror.HTTPNotFound()
67
68     try:
69       return method(self.post_data)
70     except errors.QuitGanetiException, err:
71       # Tell parent to quit
72       os.kill(self.server.noded_pid, signal.SIGTERM)
73
74   # the new block devices  --------------------------
75
76   @staticmethod
77   def perspective_blockdev_create(params):
78     """Create a block device.
79
80     """
81     bdev_s, size, owner, on_primary, info = params
82     bdev = objects.Disk.FromDict(bdev_s)
83     if bdev is None:
84       raise ValueError("can't unserialize data!")
85     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
86
87   @staticmethod
88   def perspective_blockdev_remove(params):
89     """Remove a block device.
90
91     """
92     bdev_s = params[0]
93     bdev = objects.Disk.FromDict(bdev_s)
94     return backend.RemoveBlockDevice(bdev)
95
96   @staticmethod
97   def perspective_blockdev_rename(params):
98     """Remove a block device.
99
100     """
101     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
102     return backend.RenameBlockDevices(devlist)
103
104   @staticmethod
105   def perspective_blockdev_assemble(params):
106     """Assemble a block device.
107
108     """
109     bdev_s, owner, on_primary = params
110     bdev = objects.Disk.FromDict(bdev_s)
111     if bdev is None:
112       raise ValueError("can't unserialize data!")
113     return backend.AssembleBlockDevice(bdev, owner, on_primary)
114
115   @staticmethod
116   def perspective_blockdev_shutdown(params):
117     """Shutdown a block device.
118
119     """
120     bdev_s = params[0]
121     bdev = objects.Disk.FromDict(bdev_s)
122     if bdev is None:
123       raise ValueError("can't unserialize data!")
124     return backend.ShutdownBlockDevice(bdev)
125
126   @staticmethod
127   def perspective_blockdev_addchildren(params):
128     """Add a child to a mirror device.
129
130     Note: this is only valid for mirror devices. It's the caller's duty
131     to send a correct disk, otherwise we raise an error.
132
133     """
134     bdev_s, ndev_s = params
135     bdev = objects.Disk.FromDict(bdev_s)
136     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
137     if bdev is None or ndevs.count(None) > 0:
138       raise ValueError("can't unserialize data!")
139     return backend.MirrorAddChildren(bdev, ndevs)
140
141   @staticmethod
142   def perspective_blockdev_removechildren(params):
143     """Remove a child from a mirror device.
144
145     This is only valid for mirror devices, of course. It's the callers
146     duty to send a correct disk, otherwise we raise an error.
147
148     """
149     bdev_s, ndev_s = params
150     bdev = objects.Disk.FromDict(bdev_s)
151     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
152     if bdev is None or ndevs.count(None) > 0:
153       raise ValueError("can't unserialize data!")
154     return backend.MirrorRemoveChildren(bdev, ndevs)
155
156   @staticmethod
157   def perspective_blockdev_getmirrorstatus(params):
158     """Return the mirror status for a list of disks.
159
160     """
161     disks = [objects.Disk.FromDict(dsk_s)
162             for dsk_s in params]
163     return backend.GetMirrorStatus(disks)
164
165   @staticmethod
166   def perspective_blockdev_find(params):
167     """Expose the FindBlockDevice functionality for a disk.
168
169     This will try to find but not activate a disk.
170
171     """
172     disk = objects.Disk.FromDict(params[0])
173     return backend.FindBlockDevice(disk)
174
175   @staticmethod
176   def perspective_blockdev_snapshot(params):
177     """Create a snapshot device.
178
179     Note that this is only valid for LVM disks, if we get passed
180     something else we raise an exception. The snapshot device can be
181     remove by calling the generic block device remove call.
182
183     """
184     cfbd = objects.Disk.FromDict(params[0])
185     return backend.SnapshotBlockDevice(cfbd)
186
187   @staticmethod
188   def perspective_blockdev_grow(params):
189     """Grow a stack of devices.
190
191     """
192     cfbd = objects.Disk.FromDict(params[0])
193     amount = params[1]
194     return backend.GrowBlockDevice(cfbd, amount)
195
196   @staticmethod
197   def perspective_blockdev_close(params):
198     """Closes the given block devices.
199
200     """
201     disks = [objects.Disk.FromDict(cf) for cf in params]
202     return backend.CloseBlockDevices(disks)
203
204   # export/import  --------------------------
205
206   @staticmethod
207   def perspective_snapshot_export(params):
208     """Export a given snapshot.
209
210     """
211     disk = objects.Disk.FromDict(params[0])
212     dest_node = params[1]
213     instance = objects.Instance.FromDict(params[2])
214     return backend.ExportSnapshot(disk, dest_node, instance)
215
216   @staticmethod
217   def perspective_finalize_export(params):
218     """Expose the finalize export functionality.
219
220     """
221     instance = objects.Instance.FromDict(params[0])
222     snap_disks = [objects.Disk.FromDict(str_data)
223                   for str_data in params[1]]
224     return backend.FinalizeExport(instance, snap_disks)
225
226   @staticmethod
227   def perspective_export_info(params):
228     """Query information about an existing export on this node.
229
230     The given path may not contain an export, in which case we return
231     None.
232
233     """
234     path = params[0]
235     einfo = backend.ExportInfo(path)
236     if einfo is None:
237       return einfo
238     return einfo.Dumps()
239
240   @staticmethod
241   def perspective_export_list(params):
242     """List the available exports on this node.
243
244     Note that as opposed to export_info, which may query data about an
245     export in any path, this only queries the standard Ganeti path
246     (constants.EXPORT_DIR).
247
248     """
249     return backend.ListExports()
250
251   @staticmethod
252   def perspective_export_remove(params):
253     """Remove an export.
254
255     """
256     export = params[0]
257     return backend.RemoveExport(export)
258
259   # volume  --------------------------
260
261   @staticmethod
262   def perspective_volume_list(params):
263     """Query the list of logical volumes in a given volume group.
264
265     """
266     vgname = params[0]
267     return backend.GetVolumeList(vgname)
268
269   @staticmethod
270   def perspective_vg_list(params):
271     """Query the list of volume groups.
272
273     """
274     return backend.ListVolumeGroups()
275
276   # bridge  --------------------------
277
278   @staticmethod
279   def perspective_bridges_exist(params):
280     """Check if all bridges given exist on this node.
281
282     """
283     bridges_list = params[0]
284     return backend.BridgesExist(bridges_list)
285
286   # instance  --------------------------
287
288   @staticmethod
289   def perspective_instance_os_add(params):
290     """Install an OS on a given instance.
291
292     """
293     inst_s, os_disk, swap_disk = params
294     inst = objects.Instance.FromDict(inst_s)
295     return backend.AddOSToInstance(inst, os_disk, swap_disk)
296
297   @staticmethod
298   def perspective_instance_run_rename(params):
299     """Runs the OS rename script for an instance.
300
301     """
302     inst_s, old_name, os_disk, swap_disk = params
303     inst = objects.Instance.FromDict(inst_s)
304     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
305
306   @staticmethod
307   def perspective_instance_os_import(params):
308     """Run the import function of an OS onto a given instance.
309
310     """
311     inst_s, os_disk, swap_disk, src_node, src_image = params
312     inst = objects.Instance.FromDict(inst_s)
313     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
314                                         src_node, src_image)
315
316   @staticmethod
317   def perspective_instance_shutdown(params):
318     """Shutdown an instance.
319
320     """
321     instance = objects.Instance.FromDict(params[0])
322     return backend.ShutdownInstance(instance)
323
324   @staticmethod
325   def perspective_instance_start(params):
326     """Start an instance.
327
328     """
329     instance = objects.Instance.FromDict(params[0])
330     extra_args = params[1]
331     return backend.StartInstance(instance, extra_args)
332
333   @staticmethod
334   def perspective_instance_migrate(params):
335     """Migrates an instance.
336
337     """
338     instance, target, live = params
339     return backend.MigrateInstance(instance, target, live)
340
341   @staticmethod
342   def perspective_instance_reboot(params):
343     """Reboot an instance.
344
345     """
346     instance = objects.Instance.FromDict(params[0])
347     reboot_type = params[1]
348     extra_args = params[2]
349     return backend.RebootInstance(instance, reboot_type, extra_args)
350
351   @staticmethod
352   def perspective_instance_info(params):
353     """Query instance information.
354
355     """
356     return backend.GetInstanceInfo(params[0])
357
358   @staticmethod
359   def perspective_all_instances_info(params):
360     """Query information about all instances.
361
362     """
363     return backend.GetAllInstancesInfo()
364
365   @staticmethod
366   def perspective_instance_list(params):
367     """Query the list of running instances.
368
369     """
370     return backend.GetInstanceList()
371
372   # node --------------------------
373
374   @staticmethod
375   def perspective_node_tcp_ping(params):
376     """Do a TcpPing on the remote node.
377
378     """
379     return utils.TcpPing(params[1], params[2], timeout=params[3],
380                          live_port_needed=params[4], source=params[0])
381
382   @staticmethod
383   def perspective_node_info(params):
384     """Query node information.
385
386     """
387     vgname = params[0]
388     return backend.GetNodeInfo(vgname)
389
390   @staticmethod
391   def perspective_node_add(params):
392     """Complete the registration of this node in the cluster.
393
394     """
395     return backend.AddNode(params[0], params[1], params[2],
396                            params[3], params[4], params[5])
397
398   @staticmethod
399   def perspective_node_verify(params):
400     """Run a verify sequence on this node.
401
402     """
403     return backend.VerifyNode(params[0])
404
405   @staticmethod
406   def perspective_node_start_master(params):
407     """Promote this node to master status.
408
409     """
410     return backend.StartMaster()
411
412   @staticmethod
413   def perspective_node_stop_master(params):
414     """Demote this node from master status.
415
416     """
417     return backend.StopMaster()
418
419   @staticmethod
420   def perspective_node_leave_cluster(params):
421     """Cleanup after leaving a cluster.
422
423     """
424     return backend.LeaveCluster()
425
426   @staticmethod
427   def perspective_node_volumes(params):
428     """Query the list of all logical volume groups.
429
430     """
431     return backend.NodeVolumes()
432
433   # cluster --------------------------
434
435   @staticmethod
436   def perspective_version(params):
437     """Query version information.
438
439     """
440     return constants.PROTOCOL_VERSION
441
442   @staticmethod
443   def perspective_upload_file(params):
444     """Upload a file.
445
446     Note that the backend implementation imposes strict rules on which
447     files are accepted.
448
449     """
450     return backend.UploadFile(*params)
451
452
453   # os -----------------------
454
455   @staticmethod
456   def perspective_os_diagnose(params):
457     """Query detailed information about existing OSes.
458
459     """
460     return [os.ToDict() for os in backend.DiagnoseOS()]
461
462   @staticmethod
463   def perspective_os_get(params):
464     """Query information about a given OS.
465
466     """
467     name = params[0]
468     try:
469       os_obj = backend.OSFromDisk(name)
470     except errors.InvalidOS, err:
471       os_obj = objects.OS.FromInvalidOS(err)
472     return os_obj.ToDict()
473
474   # hooks -----------------------
475
476   @staticmethod
477   def perspective_hooks_runner(params):
478     """Run hook scripts.
479
480     """
481     hpath, phase, env = params
482     hr = backend.HooksRunner()
483     return hr.RunHooks(hpath, phase, env)
484
485   # iallocator -----------------
486
487   @staticmethod
488   def perspective_iallocator_runner(params):
489     """Run an iallocator script.
490
491     """
492     name, idata = params
493     iar = backend.IAllocatorRunner()
494     return iar.Run(name, idata)
495
496   # test -----------------------
497
498   @staticmethod
499   def perspective_test_delay(params):
500     """Run test delay.
501
502     """
503     duration = params[0]
504     return utils.TestDelay(duration)
505
506   @staticmethod
507   def perspective_file_storage_dir_create(params):
508     """Create the file storage directory.
509
510     """
511     file_storage_dir = params[0]
512     return backend.CreateFileStorageDir(file_storage_dir)
513
514   @staticmethod
515   def perspective_file_storage_dir_remove(params):
516     """Remove the file storage directory.
517
518     """
519     file_storage_dir = params[0]
520     return backend.RemoveFileStorageDir(file_storage_dir)
521
522   @staticmethod
523   def perspective_file_storage_dir_rename(params):
524     """Rename the file storage directory.
525
526     """
527     old_file_storage_dir = params[0]
528     new_file_storage_dir = params[1]
529     return backend.RenameFileStorageDir(old_file_storage_dir,
530                                         new_file_storage_dir)
531
532
533 class NodeDaemonHttpServer(http.HTTPServer):
534   def __init__(self, server_address):
535     http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
536     self.noded_pid = os.getpid()
537
538   def serve_forever(self):
539     """Handle requests until told to quit."""
540     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
541     try:
542       while not sighandler.called:
543         self.handle_request()
544       # TODO: There could be children running at this point
545     finally:
546       sighandler.Reset()
547
548
549 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
550   """Forking HTTP Server.
551
552   This inherits from ForkingMixIn and HTTPServer in order to fork for each
553   request we handle. This allows more requests to be handled concurrently.
554
555   """
556
557
558 def ParseOptions():
559   """Parse the command line options.
560
561   Returns:
562     (options, args) as from OptionParser.parse_args()
563
564   """
565   parser = OptionParser(description="Ganeti node daemon",
566                         usage="%prog [-f] [-d]",
567                         version="%%prog (ganeti) %s" %
568                         constants.RELEASE_VERSION)
569
570   parser.add_option("-f", "--foreground", dest="fork",
571                     help="Don't detach from the current terminal",
572                     default=True, action="store_false")
573   parser.add_option("-d", "--debug", dest="debug",
574                     help="Enable some debug messages",
575                     default=False, action="store_true")
576   options, args = parser.parse_args()
577   return options, args
578
579
580 def main():
581   """Main function for the node daemon.
582
583   """
584   options, args = ParseOptions()
585   utils.debug = options.debug
586   for fname in (constants.SSL_CERT_FILE,):
587     if not os.path.isfile(fname):
588       print "config %s not there, will not run." % fname
589       sys.exit(5)
590
591   try:
592     ss = ssconf.SimpleStore()
593     port = ss.GetNodeDaemonPort()
594     pwdata = ss.GetNodeDaemonPassword()
595   except errors.ConfigurationError, err:
596     print "Cluster configuration incomplete: '%s'" % str(err)
597     sys.exit(5)
598
599   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
600   # situation where RUN_DIR is tmpfs
601   for dir_name in constants.SUB_RUN_DIRS:
602     if not os.path.exists(dir_name):
603       try:
604         os.mkdir(dir_name, 0755)
605       except EnvironmentError, err:
606         if err.errno != errno.EEXIST:
607           print ("Node setup wrong, cannot create directory %s: %s" %
608                  (dir_name, err))
609           sys.exit(5)
610     if not os.path.isdir(dir_name):
611       print ("Node setup wrong, %s is not a directory" % dir_name)
612       sys.exit(5)
613
614   # become a daemon
615   if options.fork:
616     utils.Daemonize(logfile=constants.LOG_NODESERVER)
617
618   utils.WritePidFile(constants.NODED_PID)
619
620   logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
621                      stderr_logging=not options.fork)
622   logging.info("ganeti node daemon startup")
623
624   if options.fork:
625     server = ForkingHTTPServer(('', port))
626   else:
627     server = NodeDaemonHttpServer(('', port))
628
629   try:
630     server.serve_forever()
631   finally:
632     utils.RemovePidFile(constants.NODED_PID)
633
634
635 if __name__ == '__main__':
636   main()