More logging for errors during noded RPC calls
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import http
44 from ganeti import utils
45
46
47 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
48   """The server implementation.
49
50   This class holds all methods exposed over the RPC interface.
51
52   """
53   def HandleRequest(self):
54     """Handle a request.
55
56     """
57     if self.command.upper() != "PUT":
58       raise http.HTTPBadRequest()
59
60     path = self.path
61     if path.startswith("/"):
62       path = path[1:]
63
64     method = getattr(self, "perspective_%s" % path, None)
65     if method is None:
66       raise httperror.HTTPNotFound()
67
68     try:
69       try:
70         return method(self.post_data)
71       except:
72         logging.exception("Error in RPC call")
73         raise
74     except errors.QuitGanetiException, err:
75       # Tell parent to quit
76       os.kill(self.server.noded_pid, signal.SIGTERM)
77
78   # the new block devices  --------------------------
79
80   @staticmethod
81   def perspective_blockdev_create(params):
82     """Create a block device.
83
84     """
85     bdev_s, size, owner, on_primary, info = params
86     bdev = objects.Disk.FromDict(bdev_s)
87     if bdev is None:
88       raise ValueError("can't unserialize data!")
89     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
90
91   @staticmethod
92   def perspective_blockdev_remove(params):
93     """Remove a block device.
94
95     """
96     bdev_s = params[0]
97     bdev = objects.Disk.FromDict(bdev_s)
98     return backend.RemoveBlockDevice(bdev)
99
100   @staticmethod
101   def perspective_blockdev_rename(params):
102     """Remove a block device.
103
104     """
105     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
106     return backend.RenameBlockDevices(devlist)
107
108   @staticmethod
109   def perspective_blockdev_assemble(params):
110     """Assemble a block device.
111
112     """
113     bdev_s, owner, on_primary = params
114     bdev = objects.Disk.FromDict(bdev_s)
115     if bdev is None:
116       raise ValueError("can't unserialize data!")
117     return backend.AssembleBlockDevice(bdev, owner, on_primary)
118
119   @staticmethod
120   def perspective_blockdev_shutdown(params):
121     """Shutdown a block device.
122
123     """
124     bdev_s = params[0]
125     bdev = objects.Disk.FromDict(bdev_s)
126     if bdev is None:
127       raise ValueError("can't unserialize data!")
128     return backend.ShutdownBlockDevice(bdev)
129
130   @staticmethod
131   def perspective_blockdev_addchildren(params):
132     """Add a child to a mirror device.
133
134     Note: this is only valid for mirror devices. It's the caller's duty
135     to send a correct disk, otherwise we raise an error.
136
137     """
138     bdev_s, ndev_s = params
139     bdev = objects.Disk.FromDict(bdev_s)
140     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
141     if bdev is None or ndevs.count(None) > 0:
142       raise ValueError("can't unserialize data!")
143     return backend.MirrorAddChildren(bdev, ndevs)
144
145   @staticmethod
146   def perspective_blockdev_removechildren(params):
147     """Remove a child from a mirror device.
148
149     This is only valid for mirror devices, of course. It's the callers
150     duty to send a correct disk, otherwise we raise an error.
151
152     """
153     bdev_s, ndev_s = params
154     bdev = objects.Disk.FromDict(bdev_s)
155     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156     if bdev is None or ndevs.count(None) > 0:
157       raise ValueError("can't unserialize data!")
158     return backend.MirrorRemoveChildren(bdev, ndevs)
159
160   @staticmethod
161   def perspective_blockdev_getmirrorstatus(params):
162     """Return the mirror status for a list of disks.
163
164     """
165     disks = [objects.Disk.FromDict(dsk_s)
166             for dsk_s in params]
167     return backend.GetMirrorStatus(disks)
168
169   @staticmethod
170   def perspective_blockdev_find(params):
171     """Expose the FindBlockDevice functionality for a disk.
172
173     This will try to find but not activate a disk.
174
175     """
176     disk = objects.Disk.FromDict(params[0])
177     return backend.FindBlockDevice(disk)
178
179   @staticmethod
180   def perspective_blockdev_snapshot(params):
181     """Create a snapshot device.
182
183     Note that this is only valid for LVM disks, if we get passed
184     something else we raise an exception. The snapshot device can be
185     remove by calling the generic block device remove call.
186
187     """
188     cfbd = objects.Disk.FromDict(params[0])
189     return backend.SnapshotBlockDevice(cfbd)
190
191   @staticmethod
192   def perspective_blockdev_grow(params):
193     """Grow a stack of devices.
194
195     """
196     cfbd = objects.Disk.FromDict(params[0])
197     amount = params[1]
198     return backend.GrowBlockDevice(cfbd, amount)
199
200   @staticmethod
201   def perspective_blockdev_close(params):
202     """Closes the given block devices.
203
204     """
205     disks = [objects.Disk.FromDict(cf) for cf in params]
206     return backend.CloseBlockDevices(disks)
207
208   # export/import  --------------------------
209
210   @staticmethod
211   def perspective_snapshot_export(params):
212     """Export a given snapshot.
213
214     """
215     disk = objects.Disk.FromDict(params[0])
216     dest_node = params[1]
217     instance = objects.Instance.FromDict(params[2])
218     return backend.ExportSnapshot(disk, dest_node, instance)
219
220   @staticmethod
221   def perspective_finalize_export(params):
222     """Expose the finalize export functionality.
223
224     """
225     instance = objects.Instance.FromDict(params[0])
226     snap_disks = [objects.Disk.FromDict(str_data)
227                   for str_data in params[1]]
228     return backend.FinalizeExport(instance, snap_disks)
229
230   @staticmethod
231   def perspective_export_info(params):
232     """Query information about an existing export on this node.
233
234     The given path may not contain an export, in which case we return
235     None.
236
237     """
238     path = params[0]
239     einfo = backend.ExportInfo(path)
240     if einfo is None:
241       return einfo
242     return einfo.Dumps()
243
244   @staticmethod
245   def perspective_export_list(params):
246     """List the available exports on this node.
247
248     Note that as opposed to export_info, which may query data about an
249     export in any path, this only queries the standard Ganeti path
250     (constants.EXPORT_DIR).
251
252     """
253     return backend.ListExports()
254
255   @staticmethod
256   def perspective_export_remove(params):
257     """Remove an export.
258
259     """
260     export = params[0]
261     return backend.RemoveExport(export)
262
263   # volume  --------------------------
264
265   @staticmethod
266   def perspective_volume_list(params):
267     """Query the list of logical volumes in a given volume group.
268
269     """
270     vgname = params[0]
271     return backend.GetVolumeList(vgname)
272
273   @staticmethod
274   def perspective_vg_list(params):
275     """Query the list of volume groups.
276
277     """
278     return backend.ListVolumeGroups()
279
280   # bridge  --------------------------
281
282   @staticmethod
283   def perspective_bridges_exist(params):
284     """Check if all bridges given exist on this node.
285
286     """
287     bridges_list = params[0]
288     return backend.BridgesExist(bridges_list)
289
290   # instance  --------------------------
291
292   @staticmethod
293   def perspective_instance_os_add(params):
294     """Install an OS on a given instance.
295
296     """
297     inst_s, os_disk, swap_disk = params
298     inst = objects.Instance.FromDict(inst_s)
299     return backend.AddOSToInstance(inst, os_disk, swap_disk)
300
301   @staticmethod
302   def perspective_instance_run_rename(params):
303     """Runs the OS rename script for an instance.
304
305     """
306     inst_s, old_name, os_disk, swap_disk = params
307     inst = objects.Instance.FromDict(inst_s)
308     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
309
310   @staticmethod
311   def perspective_instance_os_import(params):
312     """Run the import function of an OS onto a given instance.
313
314     """
315     inst_s, os_disk, swap_disk, src_node, src_image = params
316     inst = objects.Instance.FromDict(inst_s)
317     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
318                                         src_node, src_image)
319
320   @staticmethod
321   def perspective_instance_shutdown(params):
322     """Shutdown an instance.
323
324     """
325     instance = objects.Instance.FromDict(params[0])
326     return backend.ShutdownInstance(instance)
327
328   @staticmethod
329   def perspective_instance_start(params):
330     """Start an instance.
331
332     """
333     instance = objects.Instance.FromDict(params[0])
334     extra_args = params[1]
335     return backend.StartInstance(instance, extra_args)
336
337   @staticmethod
338   def perspective_instance_migrate(params):
339     """Migrates an instance.
340
341     """
342     instance, target, live = params
343     return backend.MigrateInstance(instance, target, live)
344
345   @staticmethod
346   def perspective_instance_reboot(params):
347     """Reboot an instance.
348
349     """
350     instance = objects.Instance.FromDict(params[0])
351     reboot_type = params[1]
352     extra_args = params[2]
353     return backend.RebootInstance(instance, reboot_type, extra_args)
354
355   @staticmethod
356   def perspective_instance_info(params):
357     """Query instance information.
358
359     """
360     return backend.GetInstanceInfo(params[0])
361
362   @staticmethod
363   def perspective_all_instances_info(params):
364     """Query information about all instances.
365
366     """
367     return backend.GetAllInstancesInfo()
368
369   @staticmethod
370   def perspective_instance_list(params):
371     """Query the list of running instances.
372
373     """
374     return backend.GetInstanceList()
375
376   # node --------------------------
377
378   @staticmethod
379   def perspective_node_tcp_ping(params):
380     """Do a TcpPing on the remote node.
381
382     """
383     return utils.TcpPing(params[1], params[2], timeout=params[3],
384                          live_port_needed=params[4], source=params[0])
385
386   @staticmethod
387   def perspective_node_info(params):
388     """Query node information.
389
390     """
391     vgname = params[0]
392     return backend.GetNodeInfo(vgname)
393
394   @staticmethod
395   def perspective_node_add(params):
396     """Complete the registration of this node in the cluster.
397
398     """
399     return backend.AddNode(params[0], params[1], params[2],
400                            params[3], params[4], params[5])
401
402   @staticmethod
403   def perspective_node_verify(params):
404     """Run a verify sequence on this node.
405
406     """
407     return backend.VerifyNode(params[0])
408
409   @staticmethod
410   def perspective_node_start_master(params):
411     """Promote this node to master status.
412
413     """
414     return backend.StartMaster(params[0])
415
416   @staticmethod
417   def perspective_node_stop_master(params):
418     """Demote this node from master status.
419
420     """
421     return backend.StopMaster(params[0])
422
423   @staticmethod
424   def perspective_node_leave_cluster(params):
425     """Cleanup after leaving a cluster.
426
427     """
428     return backend.LeaveCluster()
429
430   @staticmethod
431   def perspective_node_volumes(params):
432     """Query the list of all logical volume groups.
433
434     """
435     return backend.NodeVolumes()
436
437   # cluster --------------------------
438
439   @staticmethod
440   def perspective_version(params):
441     """Query version information.
442
443     """
444     return constants.PROTOCOL_VERSION
445
446   @staticmethod
447   def perspective_upload_file(params):
448     """Upload a file.
449
450     Note that the backend implementation imposes strict rules on which
451     files are accepted.
452
453     """
454     return backend.UploadFile(*params)
455
456
457   # os -----------------------
458
459   @staticmethod
460   def perspective_os_diagnose(params):
461     """Query detailed information about existing OSes.
462
463     """
464     return [os.ToDict() for os in backend.DiagnoseOS()]
465
466   @staticmethod
467   def perspective_os_get(params):
468     """Query information about a given OS.
469
470     """
471     name = params[0]
472     try:
473       os_obj = backend.OSFromDisk(name)
474     except errors.InvalidOS, err:
475       os_obj = objects.OS.FromInvalidOS(err)
476     return os_obj.ToDict()
477
478   # hooks -----------------------
479
480   @staticmethod
481   def perspective_hooks_runner(params):
482     """Run hook scripts.
483
484     """
485     hpath, phase, env = params
486     hr = backend.HooksRunner()
487     return hr.RunHooks(hpath, phase, env)
488
489   # iallocator -----------------
490
491   @staticmethod
492   def perspective_iallocator_runner(params):
493     """Run an iallocator script.
494
495     """
496     name, idata = params
497     iar = backend.IAllocatorRunner()
498     return iar.Run(name, idata)
499
500   # test -----------------------
501
502   @staticmethod
503   def perspective_test_delay(params):
504     """Run test delay.
505
506     """
507     duration = params[0]
508     return utils.TestDelay(duration)
509
510   @staticmethod
511   def perspective_file_storage_dir_create(params):
512     """Create the file storage directory.
513
514     """
515     file_storage_dir = params[0]
516     return backend.CreateFileStorageDir(file_storage_dir)
517
518   @staticmethod
519   def perspective_file_storage_dir_remove(params):
520     """Remove the file storage directory.
521
522     """
523     file_storage_dir = params[0]
524     return backend.RemoveFileStorageDir(file_storage_dir)
525
526   @staticmethod
527   def perspective_file_storage_dir_rename(params):
528     """Rename the file storage directory.
529
530     """
531     old_file_storage_dir = params[0]
532     new_file_storage_dir = params[1]
533     return backend.RenameFileStorageDir(old_file_storage_dir,
534                                         new_file_storage_dir)
535
536   @staticmethod
537   def perspective_jobqueue_update(params):
538     """Update job queue.
539
540     """
541     (file_name, content) = params
542     return backend.JobQueueUpdate(file_name, content)
543
544   @staticmethod
545   def perspective_jobqueue_purge(params):
546     """Purge job queue.
547
548     """
549     return backend.JobQueuePurge()
550
551
552 class NodeDaemonHttpServer(http.HTTPServer):
553   def __init__(self, server_address):
554     http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
555     self.noded_pid = os.getpid()
556
557   def serve_forever(self):
558     """Handle requests until told to quit."""
559     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
560     try:
561       while not sighandler.called:
562         self.handle_request()
563       # TODO: There could be children running at this point
564     finally:
565       sighandler.Reset()
566
567
568 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
569   """Forking HTTP Server.
570
571   This inherits from ForkingMixIn and HTTPServer in order to fork for each
572   request we handle. This allows more requests to be handled concurrently.
573
574   """
575
576
577 def ParseOptions():
578   """Parse the command line options.
579
580   Returns:
581     (options, args) as from OptionParser.parse_args()
582
583   """
584   parser = OptionParser(description="Ganeti node daemon",
585                         usage="%prog [-f] [-d]",
586                         version="%%prog (ganeti) %s" %
587                         constants.RELEASE_VERSION)
588
589   parser.add_option("-f", "--foreground", dest="fork",
590                     help="Don't detach from the current terminal",
591                     default=True, action="store_false")
592   parser.add_option("-d", "--debug", dest="debug",
593                     help="Enable some debug messages",
594                     default=False, action="store_true")
595   options, args = parser.parse_args()
596   return options, args
597
598
599 def main():
600   """Main function for the node daemon.
601
602   """
603   options, args = ParseOptions()
604   utils.debug = options.debug
605   for fname in (constants.SSL_CERT_FILE,):
606     if not os.path.isfile(fname):
607       print "config %s not there, will not run." % fname
608       sys.exit(5)
609
610   try:
611     ss = ssconf.SimpleStore()
612     port = ss.GetNodeDaemonPort()
613     pwdata = ss.GetNodeDaemonPassword()
614   except errors.ConfigurationError, err:
615     print "Cluster configuration incomplete: '%s'" % str(err)
616     sys.exit(5)
617
618   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
619   # situation where RUN_DIR is tmpfs
620   for dir_name in constants.SUB_RUN_DIRS:
621     if not os.path.exists(dir_name):
622       try:
623         os.mkdir(dir_name, 0755)
624       except EnvironmentError, err:
625         if err.errno != errno.EEXIST:
626           print ("Node setup wrong, cannot create directory %s: %s" %
627                  (dir_name, err))
628           sys.exit(5)
629     if not os.path.isdir(dir_name):
630       print ("Node setup wrong, %s is not a directory" % dir_name)
631       sys.exit(5)
632
633   # become a daemon
634   if options.fork:
635     utils.Daemonize(logfile=constants.LOG_NODESERVER)
636
637   utils.WritePidFile(constants.NODED_PID)
638
639   logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
640                       stderr_logging=not options.fork)
641   logging.info("ganeti node daemon startup")
642
643   if options.fork:
644     server = ForkingHTTPServer(('', port))
645   else:
646     server = NodeDaemonHttpServer(('', port))
647
648   try:
649     server.serve_forever()
650   finally:
651     utils.RemovePidFile(constants.NODED_PID)
652
653
654 if __name__ == '__main__':
655   main()