ganeti-noded: quit on QuitGanetiException
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import BaseHTTPServer
31 import simplejson
32 import errno
33
34 from optparse import OptionParser
35
36
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import utils
44
45
46 class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47   """The server implementation.
48
49   This class holds all methods exposed over the RPC interface.
50
51   """
52   def do_PUT(self):
53     """Handle a post request.
54
55     """
56     path = self.path
57     if path.startswith("/"):
58       path = path[1:]
59     mname = "perspective_%s" % path
60     if not hasattr(self, mname):
61       self.send_error(404)
62       return False
63
64     method = getattr(self, mname)
65     try:
66       body_length = int(self.headers.get('Content-Length', '0'))
67     except ValueError:
68       self.send_error(400, 'No Content-Length header or invalid format')
69       return False
70
71     try:
72       body = self.rfile.read(body_length)
73     except socket.error, err:
74       logger.Error("Socket error while reading: %s" % str(err))
75       return
76     try:
77       params = simplejson.loads(body)
78       result = method(params)
79       payload = simplejson.dumps(result)
80     except errors.QuitGanetiException, err:
81       global _EXIT_GANETI_NODED
82       _EXIT_GANETI_NODED = True
83       if isinstance(err, tuple) and len(err) == 2:
84         if err[0]:
85           self.send_error(500, "Error: %s" % str(err[1]))
86         else:
87           payload = simplejson.dumps(err[1])
88       else:
89         self.log_message('GanetiQuitException Usage Error')
90         self.send_error(500, "Error: %s" % str(err))
91     except Exception, err:
92       self.send_error(500, "Error: %s" % str(err))
93       return False
94     self.send_response(200)
95     self.send_header('Content-Length', str(len(payload)))
96     self.end_headers()
97     self.wfile.write(payload)
98     return True
99
100   def log_message(self, format, *args):
101     """Log a request to the log.
102
103     This is the same as the parent, we just log somewhere else.
104
105     """
106     msg = ("%s - - [%s] %s" %
107            (self.address_string(),
108             self.log_date_time_string(),
109             format % args))
110     logger.Debug(msg)
111
112   # the new block devices  --------------------------
113
114   @staticmethod
115   def perspective_blockdev_create(params):
116     """Create a block device.
117
118     """
119     bdev_s, size, owner, on_primary, info = params
120     bdev = objects.Disk.FromDict(bdev_s)
121     if bdev is None:
122       raise ValueError("can't unserialize data!")
123     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
124
125   @staticmethod
126   def perspective_blockdev_remove(params):
127     """Remove a block device.
128
129     """
130     bdev_s = params[0]
131     bdev = objects.Disk.FromDict(bdev_s)
132     return backend.RemoveBlockDevice(bdev)
133
134   @staticmethod
135   def perspective_blockdev_rename(params):
136     """Remove a block device.
137
138     """
139     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
140     return backend.RenameBlockDevices(devlist)
141
142   @staticmethod
143   def perspective_blockdev_assemble(params):
144     """Assemble a block device.
145
146     """
147     bdev_s, owner, on_primary = params
148     bdev = objects.Disk.FromDict(bdev_s)
149     if bdev is None:
150       raise ValueError("can't unserialize data!")
151     return backend.AssembleBlockDevice(bdev, owner, on_primary)
152
153   @staticmethod
154   def perspective_blockdev_shutdown(params):
155     """Shutdown a block device.
156
157     """
158     bdev_s = params[0]
159     bdev = objects.Disk.FromDict(bdev_s)
160     if bdev is None:
161       raise ValueError("can't unserialize data!")
162     return backend.ShutdownBlockDevice(bdev)
163
164   @staticmethod
165   def perspective_blockdev_addchildren(params):
166     """Add a child to a mirror device.
167
168     Note: this is only valid for mirror devices. It's the caller's duty
169     to send a correct disk, otherwise we raise an error.
170
171     """
172     bdev_s, ndev_s = params
173     bdev = objects.Disk.FromDict(bdev_s)
174     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
175     if bdev is None or ndevs.count(None) > 0:
176       raise ValueError("can't unserialize data!")
177     return backend.MirrorAddChildren(bdev, ndevs)
178
179   @staticmethod
180   def perspective_blockdev_removechildren(params):
181     """Remove a child from a mirror device.
182
183     This is only valid for mirror devices, of course. It's the callers
184     duty to send a correct disk, otherwise we raise an error.
185
186     """
187     bdev_s, ndev_s = params
188     bdev = objects.Disk.FromDict(bdev_s)
189     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
190     if bdev is None or ndevs.count(None) > 0:
191       raise ValueError("can't unserialize data!")
192     return backend.MirrorRemoveChildren(bdev, ndevs)
193
194   @staticmethod
195   def perspective_blockdev_getmirrorstatus(params):
196     """Return the mirror status for a list of disks.
197
198     """
199     disks = [objects.Disk.FromDict(dsk_s)
200             for dsk_s in params]
201     return backend.GetMirrorStatus(disks)
202
203   @staticmethod
204   def perspective_blockdev_find(params):
205     """Expose the FindBlockDevice functionality for a disk.
206
207     This will try to find but not activate a disk.
208
209     """
210     disk = objects.Disk.FromDict(params[0])
211     return backend.FindBlockDevice(disk)
212
213   @staticmethod
214   def perspective_blockdev_snapshot(params):
215     """Create a snapshot device.
216
217     Note that this is only valid for LVM disks, if we get passed
218     something else we raise an exception. The snapshot device can be
219     remove by calling the generic block device remove call.
220
221     """
222     cfbd = objects.Disk.FromDict(params[0])
223     return backend.SnapshotBlockDevice(cfbd)
224
225   @staticmethod
226   def perspective_blockdev_grow(params):
227     """Grow a stack of devices.
228
229     """
230     cfbd = objects.Disk.FromDict(params[0])
231     amount = params[1]
232     return backend.GrowBlockDevice(cfbd, amount)
233
234   @staticmethod
235   def perspective_blockdev_close(params):
236     """Closes the given block devices.
237
238     """
239     disks = [objects.Disk.FromDict(cf) for cf in params]
240     return backend.CloseBlockDevices(disks)
241
242   # export/import  --------------------------
243
244   @staticmethod
245   def perspective_snapshot_export(params):
246     """Export a given snapshot.
247
248     """
249     disk = objects.Disk.FromDict(params[0])
250     dest_node = params[1]
251     instance = objects.Instance.FromDict(params[2])
252     return backend.ExportSnapshot(disk, dest_node, instance)
253
254   @staticmethod
255   def perspective_finalize_export(params):
256     """Expose the finalize export functionality.
257
258     """
259     instance = objects.Instance.FromDict(params[0])
260     snap_disks = [objects.Disk.FromDict(str_data)
261                   for str_data in params[1]]
262     return backend.FinalizeExport(instance, snap_disks)
263
264   @staticmethod
265   def perspective_export_info(params):
266     """Query information about an existing export on this node.
267
268     The given path may not contain an export, in which case we return
269     None.
270
271     """
272     path = params[0]
273     einfo = backend.ExportInfo(path)
274     if einfo is None:
275       return einfo
276     return einfo.Dumps()
277
278   @staticmethod
279   def perspective_export_list(params):
280     """List the available exports on this node.
281
282     Note that as opposed to export_info, which may query data about an
283     export in any path, this only queries the standard Ganeti path
284     (constants.EXPORT_DIR).
285
286     """
287     return backend.ListExports()
288
289   @staticmethod
290   def perspective_export_remove(params):
291     """Remove an export.
292
293     """
294     export = params[0]
295     return backend.RemoveExport(export)
296
297   # volume  --------------------------
298
299   @staticmethod
300   def perspective_volume_list(params):
301     """Query the list of logical volumes in a given volume group.
302
303     """
304     vgname = params[0]
305     return backend.GetVolumeList(vgname)
306
307   @staticmethod
308   def perspective_vg_list(params):
309     """Query the list of volume groups.
310
311     """
312     return backend.ListVolumeGroups()
313
314   # bridge  --------------------------
315
316   @staticmethod
317   def perspective_bridges_exist(params):
318     """Check if all bridges given exist on this node.
319
320     """
321     bridges_list = params[0]
322     return backend.BridgesExist(bridges_list)
323
324   # instance  --------------------------
325
326   @staticmethod
327   def perspective_instance_os_add(params):
328     """Install an OS on a given instance.
329
330     """
331     inst_s, os_disk, swap_disk = params
332     inst = objects.Instance.FromDict(inst_s)
333     return backend.AddOSToInstance(inst, os_disk, swap_disk)
334
335   @staticmethod
336   def perspective_instance_run_rename(params):
337     """Runs the OS rename script for an instance.
338
339     """
340     inst_s, old_name, os_disk, swap_disk = params
341     inst = objects.Instance.FromDict(inst_s)
342     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
343
344   @staticmethod
345   def perspective_instance_os_import(params):
346     """Run the import function of an OS onto a given instance.
347
348     """
349     inst_s, os_disk, swap_disk, src_node, src_image = params
350     inst = objects.Instance.FromDict(inst_s)
351     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
352                                         src_node, src_image)
353
354   @staticmethod
355   def perspective_instance_shutdown(params):
356     """Shutdown an instance.
357
358     """
359     instance = objects.Instance.FromDict(params[0])
360     return backend.ShutdownInstance(instance)
361
362   @staticmethod
363   def perspective_instance_start(params):
364     """Start an instance.
365
366     """
367     instance = objects.Instance.FromDict(params[0])
368     extra_args = params[1]
369     return backend.StartInstance(instance, extra_args)
370
371   @staticmethod
372   def perspective_instance_migrate(params):
373     """Migrates an instance.
374
375     """
376     instance, target, live = params
377     return backend.MigrateInstance(instance, target, live)
378
379   @staticmethod
380   def perspective_instance_reboot(params):
381     """Reboot an instance.
382
383     """
384     instance = objects.Instance.FromDict(params[0])
385     reboot_type = params[1]
386     extra_args = params[2]
387     return backend.RebootInstance(instance, reboot_type, extra_args)
388
389   @staticmethod
390   def perspective_instance_info(params):
391     """Query instance information.
392
393     """
394     return backend.GetInstanceInfo(params[0])
395
396   @staticmethod
397   def perspective_all_instances_info(params):
398     """Query information about all instances.
399
400     """
401     return backend.GetAllInstancesInfo()
402
403   @staticmethod
404   def perspective_instance_list(params):
405     """Query the list of running instances.
406
407     """
408     return backend.GetInstanceList()
409
410   # node --------------------------
411
412   @staticmethod
413   def perspective_node_tcp_ping(params):
414     """Do a TcpPing on the remote node.
415
416     """
417     return utils.TcpPing(params[1], params[2], timeout=params[3],
418                          live_port_needed=params[4], source=params[0])
419
420   @staticmethod
421   def perspective_node_info(params):
422     """Query node information.
423
424     """
425     vgname = params[0]
426     return backend.GetNodeInfo(vgname)
427
428   @staticmethod
429   def perspective_node_add(params):
430     """Complete the registration of this node in the cluster.
431
432     """
433     return backend.AddNode(params[0], params[1], params[2],
434                            params[3], params[4], params[5])
435
436   @staticmethod
437   def perspective_node_verify(params):
438     """Run a verify sequence on this node.
439
440     """
441     return backend.VerifyNode(params[0])
442
443   @staticmethod
444   def perspective_node_start_master(params):
445     """Promote this node to master status.
446
447     """
448     return backend.StartMaster()
449
450   @staticmethod
451   def perspective_node_stop_master(params):
452     """Demote this node from master status.
453
454     """
455     return backend.StopMaster()
456
457   @staticmethod
458   def perspective_node_leave_cluster(params):
459     """Cleanup after leaving a cluster.
460
461     """
462     return backend.LeaveCluster()
463
464   @staticmethod
465   def perspective_node_volumes(params):
466     """Query the list of all logical volume groups.
467
468     """
469     return backend.NodeVolumes()
470
471   # cluster --------------------------
472
473   @staticmethod
474   def perspective_version(params):
475     """Query version information.
476
477     """
478     return constants.PROTOCOL_VERSION
479
480   @staticmethod
481   def perspective_upload_file(params):
482     """Upload a file.
483
484     Note that the backend implementation imposes strict rules on which
485     files are accepted.
486
487     """
488     return backend.UploadFile(*params)
489
490
491   # os -----------------------
492
493   @staticmethod
494   def perspective_os_diagnose(params):
495     """Query detailed information about existing OSes.
496
497     """
498     return [os.ToDict() for os in backend.DiagnoseOS()]
499
500   @staticmethod
501   def perspective_os_get(params):
502     """Query information about a given OS.
503
504     """
505     name = params[0]
506     try:
507       os_obj = backend.OSFromDisk(name)
508     except errors.InvalidOS, err:
509       os_obj = objects.OS.FromInvalidOS(err)
510     return os_obj.ToDict()
511
512   # hooks -----------------------
513
514   @staticmethod
515   def perspective_hooks_runner(params):
516     """Run hook scripts.
517
518     """
519     hpath, phase, env = params
520     hr = backend.HooksRunner()
521     return hr.RunHooks(hpath, phase, env)
522
523   # iallocator -----------------
524
525   @staticmethod
526   def perspective_iallocator_runner(params):
527     """Run an iallocator script.
528
529     """
530     name, idata = params
531     iar = backend.IAllocatorRunner()
532     return iar.Run(name, idata)
533
534   # test -----------------------
535
536   @staticmethod
537   def perspective_test_delay(params):
538     """Run test delay.
539
540     """
541     duration = params[0]
542     return utils.TestDelay(duration)
543
544   @staticmethod
545   def perspective_file_storage_dir_create(params):
546     """Create the file storage directory.
547
548     """
549     file_storage_dir = params[0]
550     return backend.CreateFileStorageDir(file_storage_dir)
551
552   @staticmethod
553   def perspective_file_storage_dir_remove(params):
554     """Remove the file storage directory.
555
556     """
557     file_storage_dir = params[0]
558     return backend.RemoveFileStorageDir(file_storage_dir)
559
560   @staticmethod
561   def perspective_file_storage_dir_rename(params):
562     """Rename the file storage directory.
563
564     """
565     old_file_storage_dir = params[0]
566     new_file_storage_dir = params[1]
567     return backend.RenameFileStorageDir(old_file_storage_dir,
568                                         new_file_storage_dir)
569
570
571 def ParseOptions():
572   """Parse the command line options.
573
574   Returns:
575     (options, args) as from OptionParser.parse_args()
576
577   """
578   parser = OptionParser(description="Ganeti node daemon",
579                         usage="%prog [-f] [-d]",
580                         version="%%prog (ganeti) %s" %
581                         constants.RELEASE_VERSION)
582
583   parser.add_option("-f", "--foreground", dest="fork",
584                     help="Don't detach from the current terminal",
585                     default=True, action="store_false")
586   parser.add_option("-d", "--debug", dest="debug",
587                     help="Enable some debug messages",
588                     default=False, action="store_true")
589   options, args = parser.parse_args()
590   return options, args
591
592
593 def main():
594   """Main function for the node daemon.
595
596   """
597   options, args = ParseOptions()
598   utils.debug = options.debug
599   for fname in (constants.SSL_CERT_FILE,):
600     if not os.path.isfile(fname):
601       print "config %s not there, will not run." % fname
602       sys.exit(5)
603
604   try:
605     ss = ssconf.SimpleStore()
606     port = ss.GetNodeDaemonPort()
607     pwdata = ss.GetNodeDaemonPassword()
608   except errors.ConfigurationError, err:
609     print "Cluster configuration incomplete: '%s'" % str(err)
610     sys.exit(5)
611
612   # create /var/run/ganeti if not existing, in order to take care of
613   # tmpfs /var/run
614   if not os.path.exists(constants.BDEV_CACHE_DIR):
615     try:
616       os.mkdir(constants.BDEV_CACHE_DIR, 0755)
617     except EnvironmentError, err:
618       if err.errno != errno.EEXIST:
619         print ("Node setup wrong, cannot create directory %s: %s" %
620                (constants.BDEV_CACHE_DIR, err))
621         sys.exit(5)
622   if not os.path.isdir(constants.BDEV_CACHE_DIR):
623     print ("Node setup wrong, %s is not a directory" %
624            constants.BDEV_CACHE_DIR)
625     sys.exit(5)
626
627   # become a daemon
628   if options.fork:
629     utils.Daemonize(logfile=constants.LOG_NODESERVER)
630
631   logger.SetupLogging(program="ganeti-noded", debug=options.debug)
632
633   global _EXIT_GANETI_NODED
634   _EXIT_GANETI_NODED = False
635
636   httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
637   while (not _EXIT_GANETI_NODED):
638     httpd.handle_request()
639
640
641 if __name__ == '__main__':
642   main()