Add output of job/opcode timestamps
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import jstore
43 from ganeti import ssconf
44 from ganeti import http
45 from ganeti import utils
46
47
48 queue_lock = None
49
50
51 def _RequireJobQueueLock(fn):
52   """Decorator for job queue manipulating functions.
53
54   """
55   QUEUE_LOCK_TIMEOUT = 10
56
57   def wrapper(*args, **kwargs):
58     # Locking in exclusive, blocking mode because there could be several
59     # children running at the same time. Waiting up to 10 seconds.
60     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61     try:
62       return fn(*args, **kwargs)
63     finally:
64       queue_lock.Unlock()
65
66   return wrapper
67
68
69 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
70   """The server implementation.
71
72   This class holds all methods exposed over the RPC interface.
73
74   """
75   def HandleRequest(self):
76     """Handle a request.
77
78     """
79     if self.command.upper() != "PUT":
80       raise http.HTTPBadRequest()
81
82     path = self.path
83     if path.startswith("/"):
84       path = path[1:]
85
86     method = getattr(self, "perspective_%s" % path, None)
87     if method is None:
88       raise httperror.HTTPNotFound()
89
90     try:
91       try:
92         return method(self.post_data)
93       except:
94         logging.exception("Error in RPC call")
95         raise
96     except errors.QuitGanetiException, err:
97       # Tell parent to quit
98       os.kill(self.server.noded_pid, signal.SIGTERM)
99
100   # the new block devices  --------------------------
101
102   @staticmethod
103   def perspective_blockdev_create(params):
104     """Create a block device.
105
106     """
107     bdev_s, size, owner, on_primary, info = params
108     bdev = objects.Disk.FromDict(bdev_s)
109     if bdev is None:
110       raise ValueError("can't unserialize data!")
111     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
112
113   @staticmethod
114   def perspective_blockdev_remove(params):
115     """Remove a block device.
116
117     """
118     bdev_s = params[0]
119     bdev = objects.Disk.FromDict(bdev_s)
120     return backend.RemoveBlockDevice(bdev)
121
122   @staticmethod
123   def perspective_blockdev_rename(params):
124     """Remove a block device.
125
126     """
127     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
128     return backend.RenameBlockDevices(devlist)
129
130   @staticmethod
131   def perspective_blockdev_assemble(params):
132     """Assemble a block device.
133
134     """
135     bdev_s, owner, on_primary = params
136     bdev = objects.Disk.FromDict(bdev_s)
137     if bdev is None:
138       raise ValueError("can't unserialize data!")
139     return backend.AssembleBlockDevice(bdev, owner, on_primary)
140
141   @staticmethod
142   def perspective_blockdev_shutdown(params):
143     """Shutdown a block device.
144
145     """
146     bdev_s = params[0]
147     bdev = objects.Disk.FromDict(bdev_s)
148     if bdev is None:
149       raise ValueError("can't unserialize data!")
150     return backend.ShutdownBlockDevice(bdev)
151
152   @staticmethod
153   def perspective_blockdev_addchildren(params):
154     """Add a child to a mirror device.
155
156     Note: this is only valid for mirror devices. It's the caller's duty
157     to send a correct disk, otherwise we raise an error.
158
159     """
160     bdev_s, ndev_s = params
161     bdev = objects.Disk.FromDict(bdev_s)
162     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
163     if bdev is None or ndevs.count(None) > 0:
164       raise ValueError("can't unserialize data!")
165     return backend.MirrorAddChildren(bdev, ndevs)
166
167   @staticmethod
168   def perspective_blockdev_removechildren(params):
169     """Remove a child from a mirror device.
170
171     This is only valid for mirror devices, of course. It's the callers
172     duty to send a correct disk, otherwise we raise an error.
173
174     """
175     bdev_s, ndev_s = params
176     bdev = objects.Disk.FromDict(bdev_s)
177     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
178     if bdev is None or ndevs.count(None) > 0:
179       raise ValueError("can't unserialize data!")
180     return backend.MirrorRemoveChildren(bdev, ndevs)
181
182   @staticmethod
183   def perspective_blockdev_getmirrorstatus(params):
184     """Return the mirror status for a list of disks.
185
186     """
187     disks = [objects.Disk.FromDict(dsk_s)
188             for dsk_s in params]
189     return backend.GetMirrorStatus(disks)
190
191   @staticmethod
192   def perspective_blockdev_find(params):
193     """Expose the FindBlockDevice functionality for a disk.
194
195     This will try to find but not activate a disk.
196
197     """
198     disk = objects.Disk.FromDict(params[0])
199     return backend.FindBlockDevice(disk)
200
201   @staticmethod
202   def perspective_blockdev_snapshot(params):
203     """Create a snapshot device.
204
205     Note that this is only valid for LVM disks, if we get passed
206     something else we raise an exception. The snapshot device can be
207     remove by calling the generic block device remove call.
208
209     """
210     cfbd = objects.Disk.FromDict(params[0])
211     return backend.SnapshotBlockDevice(cfbd)
212
213   @staticmethod
214   def perspective_blockdev_grow(params):
215     """Grow a stack of devices.
216
217     """
218     cfbd = objects.Disk.FromDict(params[0])
219     amount = params[1]
220     return backend.GrowBlockDevice(cfbd, amount)
221
222   @staticmethod
223   def perspective_blockdev_close(params):
224     """Closes the given block devices.
225
226     """
227     disks = [objects.Disk.FromDict(cf) for cf in params]
228     return backend.CloseBlockDevices(disks)
229
230   # export/import  --------------------------
231
232   @staticmethod
233   def perspective_snapshot_export(params):
234     """Export a given snapshot.
235
236     """
237     disk = objects.Disk.FromDict(params[0])
238     dest_node = params[1]
239     instance = objects.Instance.FromDict(params[2])
240     return backend.ExportSnapshot(disk, dest_node, instance)
241
242   @staticmethod
243   def perspective_finalize_export(params):
244     """Expose the finalize export functionality.
245
246     """
247     instance = objects.Instance.FromDict(params[0])
248     snap_disks = [objects.Disk.FromDict(str_data)
249                   for str_data in params[1]]
250     return backend.FinalizeExport(instance, snap_disks)
251
252   @staticmethod
253   def perspective_export_info(params):
254     """Query information about an existing export on this node.
255
256     The given path may not contain an export, in which case we return
257     None.
258
259     """
260     path = params[0]
261     einfo = backend.ExportInfo(path)
262     if einfo is None:
263       return einfo
264     return einfo.Dumps()
265
266   @staticmethod
267   def perspective_export_list(params):
268     """List the available exports on this node.
269
270     Note that as opposed to export_info, which may query data about an
271     export in any path, this only queries the standard Ganeti path
272     (constants.EXPORT_DIR).
273
274     """
275     return backend.ListExports()
276
277   @staticmethod
278   def perspective_export_remove(params):
279     """Remove an export.
280
281     """
282     export = params[0]
283     return backend.RemoveExport(export)
284
285   # volume  --------------------------
286
287   @staticmethod
288   def perspective_volume_list(params):
289     """Query the list of logical volumes in a given volume group.
290
291     """
292     vgname = params[0]
293     return backend.GetVolumeList(vgname)
294
295   @staticmethod
296   def perspective_vg_list(params):
297     """Query the list of volume groups.
298
299     """
300     return backend.ListVolumeGroups()
301
302   # bridge  --------------------------
303
304   @staticmethod
305   def perspective_bridges_exist(params):
306     """Check if all bridges given exist on this node.
307
308     """
309     bridges_list = params[0]
310     return backend.BridgesExist(bridges_list)
311
312   # instance  --------------------------
313
314   @staticmethod
315   def perspective_instance_os_add(params):
316     """Install an OS on a given instance.
317
318     """
319     inst_s, os_disk, swap_disk = params
320     inst = objects.Instance.FromDict(inst_s)
321     return backend.AddOSToInstance(inst, os_disk, swap_disk)
322
323   @staticmethod
324   def perspective_instance_run_rename(params):
325     """Runs the OS rename script for an instance.
326
327     """
328     inst_s, old_name, os_disk, swap_disk = params
329     inst = objects.Instance.FromDict(inst_s)
330     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
331
332   @staticmethod
333   def perspective_instance_os_import(params):
334     """Run the import function of an OS onto a given instance.
335
336     """
337     inst_s, os_disk, swap_disk, src_node, src_image = params
338     inst = objects.Instance.FromDict(inst_s)
339     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
340                                         src_node, src_image)
341
342   @staticmethod
343   def perspective_instance_shutdown(params):
344     """Shutdown an instance.
345
346     """
347     instance = objects.Instance.FromDict(params[0])
348     return backend.ShutdownInstance(instance)
349
350   @staticmethod
351   def perspective_instance_start(params):
352     """Start an instance.
353
354     """
355     instance = objects.Instance.FromDict(params[0])
356     extra_args = params[1]
357     return backend.StartInstance(instance, extra_args)
358
359   @staticmethod
360   def perspective_instance_migrate(params):
361     """Migrates an instance.
362
363     """
364     instance, target, live = params
365     return backend.MigrateInstance(instance, target, live)
366
367   @staticmethod
368   def perspective_instance_reboot(params):
369     """Reboot an instance.
370
371     """
372     instance = objects.Instance.FromDict(params[0])
373     reboot_type = params[1]
374     extra_args = params[2]
375     return backend.RebootInstance(instance, reboot_type, extra_args)
376
377   @staticmethod
378   def perspective_instance_info(params):
379     """Query instance information.
380
381     """
382     return backend.GetInstanceInfo(params[0])
383
384   @staticmethod
385   def perspective_all_instances_info(params):
386     """Query information about all instances.
387
388     """
389     return backend.GetAllInstancesInfo()
390
391   @staticmethod
392   def perspective_instance_list(params):
393     """Query the list of running instances.
394
395     """
396     return backend.GetInstanceList()
397
398   # node --------------------------
399
400   @staticmethod
401   def perspective_node_tcp_ping(params):
402     """Do a TcpPing on the remote node.
403
404     """
405     return utils.TcpPing(params[1], params[2], timeout=params[3],
406                          live_port_needed=params[4], source=params[0])
407
408   @staticmethod
409   def perspective_node_info(params):
410     """Query node information.
411
412     """
413     vgname = params[0]
414     return backend.GetNodeInfo(vgname)
415
416   @staticmethod
417   def perspective_node_add(params):
418     """Complete the registration of this node in the cluster.
419
420     """
421     return backend.AddNode(params[0], params[1], params[2],
422                            params[3], params[4], params[5])
423
424   @staticmethod
425   def perspective_node_verify(params):
426     """Run a verify sequence on this node.
427
428     """
429     return backend.VerifyNode(params[0])
430
431   @staticmethod
432   def perspective_node_start_master(params):
433     """Promote this node to master status.
434
435     """
436     return backend.StartMaster(params[0])
437
438   @staticmethod
439   def perspective_node_stop_master(params):
440     """Demote this node from master status.
441
442     """
443     return backend.StopMaster(params[0])
444
445   @staticmethod
446   def perspective_node_leave_cluster(params):
447     """Cleanup after leaving a cluster.
448
449     """
450     return backend.LeaveCluster()
451
452   @staticmethod
453   def perspective_node_volumes(params):
454     """Query the list of all logical volume groups.
455
456     """
457     return backend.NodeVolumes()
458
459   # cluster --------------------------
460
461   @staticmethod
462   def perspective_version(params):
463     """Query version information.
464
465     """
466     return constants.PROTOCOL_VERSION
467
468   @staticmethod
469   def perspective_upload_file(params):
470     """Upload a file.
471
472     Note that the backend implementation imposes strict rules on which
473     files are accepted.
474
475     """
476     return backend.UploadFile(*params)
477
478   @staticmethod
479   def perspective_master_info(params):
480     """Query master information.
481
482     """
483     return backend.GetMasterInfo()
484
485   # os -----------------------
486
487   @staticmethod
488   def perspective_os_diagnose(params):
489     """Query detailed information about existing OSes.
490
491     """
492     return [os.ToDict() for os in backend.DiagnoseOS()]
493
494   @staticmethod
495   def perspective_os_get(params):
496     """Query information about a given OS.
497
498     """
499     name = params[0]
500     try:
501       os_obj = backend.OSFromDisk(name)
502     except errors.InvalidOS, err:
503       os_obj = objects.OS.FromInvalidOS(err)
504     return os_obj.ToDict()
505
506   # hooks -----------------------
507
508   @staticmethod
509   def perspective_hooks_runner(params):
510     """Run hook scripts.
511
512     """
513     hpath, phase, env = params
514     hr = backend.HooksRunner()
515     return hr.RunHooks(hpath, phase, env)
516
517   # iallocator -----------------
518
519   @staticmethod
520   def perspective_iallocator_runner(params):
521     """Run an iallocator script.
522
523     """
524     name, idata = params
525     iar = backend.IAllocatorRunner()
526     return iar.Run(name, idata)
527
528   # test -----------------------
529
530   @staticmethod
531   def perspective_test_delay(params):
532     """Run test delay.
533
534     """
535     duration = params[0]
536     return utils.TestDelay(duration)
537
538   # file storage ---------------
539
540   @staticmethod
541   def perspective_file_storage_dir_create(params):
542     """Create the file storage directory.
543
544     """
545     file_storage_dir = params[0]
546     return backend.CreateFileStorageDir(file_storage_dir)
547
548   @staticmethod
549   def perspective_file_storage_dir_remove(params):
550     """Remove the file storage directory.
551
552     """
553     file_storage_dir = params[0]
554     return backend.RemoveFileStorageDir(file_storage_dir)
555
556   @staticmethod
557   def perspective_file_storage_dir_rename(params):
558     """Rename the file storage directory.
559
560     """
561     old_file_storage_dir = params[0]
562     new_file_storage_dir = params[1]
563     return backend.RenameFileStorageDir(old_file_storage_dir,
564                                         new_file_storage_dir)
565
566   # jobs ------------------------
567
568   @staticmethod
569   @_RequireJobQueueLock
570   def perspective_jobqueue_update(params):
571     """Update job queue.
572
573     """
574     (file_name, content) = params
575     return backend.JobQueueUpdate(file_name, content)
576
577   @staticmethod
578   @_RequireJobQueueLock
579   def perspective_jobqueue_purge(params):
580     """Purge job queue.
581
582     """
583     return backend.JobQueuePurge()
584
585   @staticmethod
586   @_RequireJobQueueLock
587   def perspective_jobqueue_rename(params):
588     """Rename a job queue file.
589
590     """
591     (old, new) = params
592
593     return backend.JobQueueRename(old, new)
594
595
596 class NodeDaemonHttpServer(http.HTTPServer):
597   def __init__(self, server_address):
598     http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
599     self.noded_pid = os.getpid()
600
601   def serve_forever(self):
602     """Handle requests until told to quit."""
603     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
604     try:
605       while not sighandler.called:
606         self.handle_request()
607       # TODO: There could be children running at this point
608     finally:
609       sighandler.Reset()
610
611
612 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
613   """Forking HTTP Server.
614
615   This inherits from ForkingMixIn and HTTPServer in order to fork for each
616   request we handle. This allows more requests to be handled concurrently.
617
618   """
619
620
621 def ParseOptions():
622   """Parse the command line options.
623
624   Returns:
625     (options, args) as from OptionParser.parse_args()
626
627   """
628   parser = OptionParser(description="Ganeti node daemon",
629                         usage="%prog [-f] [-d]",
630                         version="%%prog (ganeti) %s" %
631                         constants.RELEASE_VERSION)
632
633   parser.add_option("-f", "--foreground", dest="fork",
634                     help="Don't detach from the current terminal",
635                     default=True, action="store_false")
636   parser.add_option("-d", "--debug", dest="debug",
637                     help="Enable some debug messages",
638                     default=False, action="store_true")
639   options, args = parser.parse_args()
640   return options, args
641
642
643 def main():
644   """Main function for the node daemon.
645
646   """
647   global queue_lock
648
649   options, args = ParseOptions()
650   utils.debug = options.debug
651   for fname in (constants.SSL_CERT_FILE,):
652     if not os.path.isfile(fname):
653       print "config %s not there, will not run." % fname
654       sys.exit(5)
655
656   try:
657     ss = ssconf.SimpleStore()
658     port = ss.GetNodeDaemonPort()
659     pwdata = ss.GetNodeDaemonPassword()
660   except errors.ConfigurationError, err:
661     print "Cluster configuration incomplete: '%s'" % str(err)
662     sys.exit(5)
663
664   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
665   # situation where RUN_DIR is tmpfs
666   for dir_name in constants.SUB_RUN_DIRS:
667     if not os.path.exists(dir_name):
668       try:
669         os.mkdir(dir_name, 0755)
670       except EnvironmentError, err:
671         if err.errno != errno.EEXIST:
672           print ("Node setup wrong, cannot create directory %s: %s" %
673                  (dir_name, err))
674           sys.exit(5)
675     if not os.path.isdir(dir_name):
676       print ("Node setup wrong, %s is not a directory" % dir_name)
677       sys.exit(5)
678
679   # become a daemon
680   if options.fork:
681     utils.Daemonize(logfile=constants.LOG_NODESERVER)
682
683   utils.WritePidFile(constants.NODED_PID)
684
685   logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
686                       stderr_logging=not options.fork)
687   logging.info("ganeti node daemon startup")
688
689   # Prepare job queue
690   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
691
692   if options.fork:
693     server = ForkingHTTPServer(('', port))
694   else:
695     server = NodeDaemonHttpServer(('', port))
696
697   try:
698     server.serve_forever()
699   finally:
700     utils.RemovePidFile(constants.NODED_PID)
701
702
703 if __name__ == '__main__':
704   main()