Remove the logger.py module
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46
47 queue_lock = None
48
49
50 def _RequireJobQueueLock(fn):
51   """Decorator for job queue manipulating functions.
52
53   """
54   QUEUE_LOCK_TIMEOUT = 10
55
56   def wrapper(*args, **kwargs):
57     # Locking in exclusive, blocking mode because there could be several
58     # children running at the same time. Waiting up to 10 seconds.
59     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60     try:
61       return fn(*args, **kwargs)
62     finally:
63       queue_lock.Unlock()
64
65   return wrapper
66
67
68 class NodeHttpServer(http.HttpServer):
69   """The server implementation.
70
71   This class holds all methods exposed over the RPC interface.
72
73   """
74   def __init__(self, *args, **kwargs):
75     http.HttpServer.__init__(self, *args, **kwargs)
76     self.noded_pid = os.getpid()
77
78   def HandleRequest(self, req):
79     """Handle a request.
80
81     """
82     if req.request_method.upper() != "PUT":
83       raise http.HTTPBadRequest()
84
85     path = req.request_path
86     if path.startswith("/"):
87       path = path[1:]
88
89     method = getattr(self, "perspective_%s" % path, None)
90     if method is None:
91       raise http.HTTPNotFound()
92
93     try:
94       try:
95         return method(req.request_post_data)
96       except:
97         logging.exception("Error in RPC call")
98         raise
99     except errors.QuitGanetiException, err:
100       # Tell parent to quit
101       os.kill(self.noded_pid, signal.SIGTERM)
102
103   # the new block devices  --------------------------
104
105   @staticmethod
106   def perspective_blockdev_create(params):
107     """Create a block device.
108
109     """
110     bdev_s, size, owner, on_primary, info = params
111     bdev = objects.Disk.FromDict(bdev_s)
112     if bdev is None:
113       raise ValueError("can't unserialize data!")
114     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115
116   @staticmethod
117   def perspective_blockdev_remove(params):
118     """Remove a block device.
119
120     """
121     bdev_s = params[0]
122     bdev = objects.Disk.FromDict(bdev_s)
123     return backend.RemoveBlockDevice(bdev)
124
125   @staticmethod
126   def perspective_blockdev_rename(params):
127     """Remove a block device.
128
129     """
130     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131     return backend.RenameBlockDevices(devlist)
132
133   @staticmethod
134   def perspective_blockdev_assemble(params):
135     """Assemble a block device.
136
137     """
138     bdev_s, owner, on_primary = params
139     bdev = objects.Disk.FromDict(bdev_s)
140     if bdev is None:
141       raise ValueError("can't unserialize data!")
142     return backend.AssembleBlockDevice(bdev, owner, on_primary)
143
144   @staticmethod
145   def perspective_blockdev_shutdown(params):
146     """Shutdown a block device.
147
148     """
149     bdev_s = params[0]
150     bdev = objects.Disk.FromDict(bdev_s)
151     if bdev is None:
152       raise ValueError("can't unserialize data!")
153     return backend.ShutdownBlockDevice(bdev)
154
155   @staticmethod
156   def perspective_blockdev_addchildren(params):
157     """Add a child to a mirror device.
158
159     Note: this is only valid for mirror devices. It's the caller's duty
160     to send a correct disk, otherwise we raise an error.
161
162     """
163     bdev_s, ndev_s = params
164     bdev = objects.Disk.FromDict(bdev_s)
165     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166     if bdev is None or ndevs.count(None) > 0:
167       raise ValueError("can't unserialize data!")
168     return backend.MirrorAddChildren(bdev, ndevs)
169
170   @staticmethod
171   def perspective_blockdev_removechildren(params):
172     """Remove a child from a mirror device.
173
174     This is only valid for mirror devices, of course. It's the callers
175     duty to send a correct disk, otherwise we raise an error.
176
177     """
178     bdev_s, ndev_s = params
179     bdev = objects.Disk.FromDict(bdev_s)
180     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181     if bdev is None or ndevs.count(None) > 0:
182       raise ValueError("can't unserialize data!")
183     return backend.MirrorRemoveChildren(bdev, ndevs)
184
185   @staticmethod
186   def perspective_blockdev_getmirrorstatus(params):
187     """Return the mirror status for a list of disks.
188
189     """
190     disks = [objects.Disk.FromDict(dsk_s)
191             for dsk_s in params]
192     return backend.GetMirrorStatus(disks)
193
194   @staticmethod
195   def perspective_blockdev_find(params):
196     """Expose the FindBlockDevice functionality for a disk.
197
198     This will try to find but not activate a disk.
199
200     """
201     disk = objects.Disk.FromDict(params[0])
202     return backend.FindBlockDevice(disk)
203
204   @staticmethod
205   def perspective_blockdev_snapshot(params):
206     """Create a snapshot device.
207
208     Note that this is only valid for LVM disks, if we get passed
209     something else we raise an exception. The snapshot device can be
210     remove by calling the generic block device remove call.
211
212     """
213     cfbd = objects.Disk.FromDict(params[0])
214     return backend.SnapshotBlockDevice(cfbd)
215
216   @staticmethod
217   def perspective_blockdev_grow(params):
218     """Grow a stack of devices.
219
220     """
221     cfbd = objects.Disk.FromDict(params[0])
222     amount = params[1]
223     return backend.GrowBlockDevice(cfbd, amount)
224
225   @staticmethod
226   def perspective_blockdev_close(params):
227     """Closes the given block devices.
228
229     """
230     disks = [objects.Disk.FromDict(cf) for cf in params]
231     return backend.CloseBlockDevices(disks)
232
233   # export/import  --------------------------
234
235   @staticmethod
236   def perspective_snapshot_export(params):
237     """Export a given snapshot.
238
239     """
240     disk = objects.Disk.FromDict(params[0])
241     dest_node = params[1]
242     instance = objects.Instance.FromDict(params[2])
243     cluster_name = params[3]
244     return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
245
246   @staticmethod
247   def perspective_finalize_export(params):
248     """Expose the finalize export functionality.
249
250     """
251     instance = objects.Instance.FromDict(params[0])
252     snap_disks = [objects.Disk.FromDict(str_data)
253                   for str_data in params[1]]
254     return backend.FinalizeExport(instance, snap_disks)
255
256   @staticmethod
257   def perspective_export_info(params):
258     """Query information about an existing export on this node.
259
260     The given path may not contain an export, in which case we return
261     None.
262
263     """
264     path = params[0]
265     einfo = backend.ExportInfo(path)
266     if einfo is None:
267       return einfo
268     return einfo.Dumps()
269
270   @staticmethod
271   def perspective_export_list(params):
272     """List the available exports on this node.
273
274     Note that as opposed to export_info, which may query data about an
275     export in any path, this only queries the standard Ganeti path
276     (constants.EXPORT_DIR).
277
278     """
279     return backend.ListExports()
280
281   @staticmethod
282   def perspective_export_remove(params):
283     """Remove an export.
284
285     """
286     export = params[0]
287     return backend.RemoveExport(export)
288
289   # volume  --------------------------
290
291   @staticmethod
292   def perspective_volume_list(params):
293     """Query the list of logical volumes in a given volume group.
294
295     """
296     vgname = params[0]
297     return backend.GetVolumeList(vgname)
298
299   @staticmethod
300   def perspective_vg_list(params):
301     """Query the list of volume groups.
302
303     """
304     return backend.ListVolumeGroups()
305
306   # bridge  --------------------------
307
308   @staticmethod
309   def perspective_bridges_exist(params):
310     """Check if all bridges given exist on this node.
311
312     """
313     bridges_list = params[0]
314     return backend.BridgesExist(bridges_list)
315
316   # instance  --------------------------
317
318   @staticmethod
319   def perspective_instance_os_add(params):
320     """Install an OS on a given instance.
321
322     """
323     inst_s = params[0]
324     inst = objects.Instance.FromDict(inst_s)
325     return backend.AddOSToInstance(inst)
326
327   @staticmethod
328   def perspective_instance_run_rename(params):
329     """Runs the OS rename script for an instance.
330
331     """
332     inst_s, old_name = params
333     inst = objects.Instance.FromDict(inst_s)
334     return backend.RunRenameInstance(inst, old_name)
335
336   @staticmethod
337   def perspective_instance_os_import(params):
338     """Run the import function of an OS onto a given instance.
339
340     """
341     inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
342     inst = objects.Instance.FromDict(inst_s)
343     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
344                                         src_node, src_image, cluster_name)
345
346   @staticmethod
347   def perspective_instance_shutdown(params):
348     """Shutdown an instance.
349
350     """
351     instance = objects.Instance.FromDict(params[0])
352     return backend.ShutdownInstance(instance)
353
354   @staticmethod
355   def perspective_instance_start(params):
356     """Start an instance.
357
358     """
359     instance = objects.Instance.FromDict(params[0])
360     extra_args = params[1]
361     return backend.StartInstance(instance, extra_args)
362
363   @staticmethod
364   def perspective_instance_migrate(params):
365     """Migrates an instance.
366
367     """
368     instance, target, live = params
369     instance = objects.Instance.FromDict(instance)
370     return backend.MigrateInstance(instance, target, live)
371
372   @staticmethod
373   def perspective_instance_reboot(params):
374     """Reboot an instance.
375
376     """
377     instance = objects.Instance.FromDict(params[0])
378     reboot_type = params[1]
379     extra_args = params[2]
380     return backend.RebootInstance(instance, reboot_type, extra_args)
381
382   @staticmethod
383   def perspective_instance_info(params):
384     """Query instance information.
385
386     """
387     return backend.GetInstanceInfo(params[0], params[1])
388
389   @staticmethod
390   def perspective_all_instances_info(params):
391     """Query information about all instances.
392
393     """
394     return backend.GetAllInstancesInfo(params[0])
395
396   @staticmethod
397   def perspective_instance_list(params):
398     """Query the list of running instances.
399
400     """
401     return backend.GetInstanceList(params[0])
402
403   # node --------------------------
404
405   @staticmethod
406   def perspective_node_tcp_ping(params):
407     """Do a TcpPing on the remote node.
408
409     """
410     return utils.TcpPing(params[1], params[2], timeout=params[3],
411                          live_port_needed=params[4], source=params[0])
412
413   @staticmethod
414   def perspective_node_has_ip_address(params):
415     """Checks if a node has the given ip address.
416
417     """
418     return utils.OwnIpAddress(params[0])
419
420   @staticmethod
421   def perspective_node_info(params):
422     """Query node information.
423
424     """
425     vgname, hypervisor_type = params
426     return backend.GetNodeInfo(vgname, hypervisor_type)
427
428   @staticmethod
429   def perspective_node_add(params):
430     """Complete the registration of this node in the cluster.
431
432     """
433     return backend.AddNode(params[0], params[1], params[2],
434                            params[3], params[4], params[5])
435
436   @staticmethod
437   def perspective_node_verify(params):
438     """Run a verify sequence on this node.
439
440     """
441     return backend.VerifyNode(params[0], params[1])
442
443   @staticmethod
444   def perspective_node_start_master(params):
445     """Promote this node to master status.
446
447     """
448     return backend.StartMaster(params[0])
449
450   @staticmethod
451   def perspective_node_stop_master(params):
452     """Demote this node from master status.
453
454     """
455     return backend.StopMaster(params[0])
456
457   @staticmethod
458   def perspective_node_leave_cluster(params):
459     """Cleanup after leaving a cluster.
460
461     """
462     return backend.LeaveCluster()
463
464   @staticmethod
465   def perspective_node_volumes(params):
466     """Query the list of all logical volume groups.
467
468     """
469     return backend.NodeVolumes()
470
471   # cluster --------------------------
472
473   @staticmethod
474   def perspective_version(params):
475     """Query version information.
476
477     """
478     return constants.PROTOCOL_VERSION
479
480   @staticmethod
481   def perspective_upload_file(params):
482     """Upload a file.
483
484     Note that the backend implementation imposes strict rules on which
485     files are accepted.
486
487     """
488     return backend.UploadFile(*params)
489
490   @staticmethod
491   def perspective_master_info(params):
492     """Query master information.
493
494     """
495     return backend.GetMasterInfo()
496
497   # os -----------------------
498
499   @staticmethod
500   def perspective_os_diagnose(params):
501     """Query detailed information about existing OSes.
502
503     """
504     return [os.ToDict() for os in backend.DiagnoseOS()]
505
506   @staticmethod
507   def perspective_os_get(params):
508     """Query information about a given OS.
509
510     """
511     name = params[0]
512     try:
513       os_obj = backend.OSFromDisk(name)
514     except errors.InvalidOS, err:
515       os_obj = objects.OS.FromInvalidOS(err)
516     return os_obj.ToDict()
517
518   # hooks -----------------------
519
520   @staticmethod
521   def perspective_hooks_runner(params):
522     """Run hook scripts.
523
524     """
525     hpath, phase, env = params
526     hr = backend.HooksRunner()
527     return hr.RunHooks(hpath, phase, env)
528
529   # iallocator -----------------
530
531   @staticmethod
532   def perspective_iallocator_runner(params):
533     """Run an iallocator script.
534
535     """
536     name, idata = params
537     iar = backend.IAllocatorRunner()
538     return iar.Run(name, idata)
539
540   # test -----------------------
541
542   @staticmethod
543   def perspective_test_delay(params):
544     """Run test delay.
545
546     """
547     duration = params[0]
548     return utils.TestDelay(duration)
549
550   # file storage ---------------
551
552   @staticmethod
553   def perspective_file_storage_dir_create(params):
554     """Create the file storage directory.
555
556     """
557     file_storage_dir = params[0]
558     return backend.CreateFileStorageDir(file_storage_dir)
559
560   @staticmethod
561   def perspective_file_storage_dir_remove(params):
562     """Remove the file storage directory.
563
564     """
565     file_storage_dir = params[0]
566     return backend.RemoveFileStorageDir(file_storage_dir)
567
568   @staticmethod
569   def perspective_file_storage_dir_rename(params):
570     """Rename the file storage directory.
571
572     """
573     old_file_storage_dir = params[0]
574     new_file_storage_dir = params[1]
575     return backend.RenameFileStorageDir(old_file_storage_dir,
576                                         new_file_storage_dir)
577
578   # jobs ------------------------
579
580   @staticmethod
581   @_RequireJobQueueLock
582   def perspective_jobqueue_update(params):
583     """Update job queue.
584
585     """
586     (file_name, content) = params
587     return backend.JobQueueUpdate(file_name, content)
588
589   @staticmethod
590   @_RequireJobQueueLock
591   def perspective_jobqueue_purge(params):
592     """Purge job queue.
593
594     """
595     return backend.JobQueuePurge()
596
597   @staticmethod
598   @_RequireJobQueueLock
599   def perspective_jobqueue_rename(params):
600     """Rename a job queue file.
601
602     """
603     (old, new) = params
604
605     return backend.JobQueueRename(old, new)
606
607   @staticmethod
608   def perspective_jobqueue_set_drain(params):
609     """Set/unset the queue drain flag.
610
611     """
612     drain_flag = params[0]
613     return backend.JobQueueSetDrainFlag(drain_flag)
614
615
616   # hypervisor ---------------
617
618   @staticmethod
619   def perspective_hypervisor_validate_params(params):
620     """Validate the hypervisor parameters.
621
622     """
623     (hvname, hvparams) = params
624     return backend.ValidateHVParams(hvname, hvparams)
625
626
627 def ParseOptions():
628   """Parse the command line options.
629
630   Returns:
631     (options, args) as from OptionParser.parse_args()
632
633   """
634   parser = OptionParser(description="Ganeti node daemon",
635                         usage="%prog [-f] [-d]",
636                         version="%%prog (ganeti) %s" %
637                         constants.RELEASE_VERSION)
638
639   parser.add_option("-f", "--foreground", dest="fork",
640                     help="Don't detach from the current terminal",
641                     default=True, action="store_false")
642   parser.add_option("-d", "--debug", dest="debug",
643                     help="Enable some debug messages",
644                     default=False, action="store_true")
645   options, args = parser.parse_args()
646   return options, args
647
648
649 def main():
650   """Main function for the node daemon.
651
652   """
653   global queue_lock
654
655   options, args = ParseOptions()
656   utils.debug = options.debug
657   for fname in (constants.SSL_CERT_FILE,):
658     if not os.path.isfile(fname):
659       print "config %s not there, will not run." % fname
660       sys.exit(5)
661
662   try:
663     port = utils.GetNodeDaemonPort()
664     pwdata = utils.GetNodeDaemonPassword()
665   except errors.ConfigurationError, err:
666     print "Cluster configuration incomplete: '%s'" % str(err)
667     sys.exit(5)
668
669   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
670   # situation where RUN_DIR is tmpfs
671   for dir_name in constants.SUB_RUN_DIRS:
672     if not os.path.exists(dir_name):
673       try:
674         os.mkdir(dir_name, 0755)
675       except EnvironmentError, err:
676         if err.errno != errno.EEXIST:
677           print ("Node setup wrong, cannot create directory %s: %s" %
678                  (dir_name, err))
679           sys.exit(5)
680     if not os.path.isdir(dir_name):
681       print ("Node setup wrong, %s is not a directory" % dir_name)
682       sys.exit(5)
683
684   # become a daemon
685   if options.fork:
686     utils.Daemonize(logfile=constants.LOG_NODESERVER)
687
688   utils.WritePidFile(constants.NODED_PID)
689   try:
690     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
691                        stderr_logging=not options.fork)
692     logging.info("ganeti node daemon startup")
693
694     # Prepare job queue
695     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
696
697     mainloop = daemon.Mainloop()
698     server = NodeHttpServer(mainloop, ("", port))
699     server.Start()
700     try:
701       mainloop.Run()
702     finally:
703       server.Stop()
704   finally:
705     utils.RemovePidFile(constants.NODED_PID)
706
707
708 if __name__ == '__main__':
709   main()