Convert os_diagnose rpc to new style result
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       return method(req.request_body)
97     except backend.RPCFail, err:
98       # our custom failure exception; str(err) works fine if the
99       # exception was constructed with a single argument, and in
100       # this case, err.message == err.args[0] == str(err)
101       return (False, str(err))
102     except errors.QuitGanetiException, err:
103       # Tell parent to quit
104       logging.info("Shutting down the node daemon, arguments: %s",
105                    str(err.args))
106       os.kill(self.noded_pid, signal.SIGTERM)
107       # And return the error's arguments, which must be already in
108       # correct tuple format
109       return err.args
110     except:
111       logging.exception("Error in RPC call")
112       raise
113
114   # the new block devices  --------------------------
115
116   @staticmethod
117   def perspective_blockdev_create(params):
118     """Create a block device.
119
120     """
121     bdev_s, size, owner, on_primary, info = params
122     bdev = objects.Disk.FromDict(bdev_s)
123     if bdev is None:
124       raise ValueError("can't unserialize data!")
125     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
126
127   @staticmethod
128   def perspective_blockdev_remove(params):
129     """Remove a block device.
130
131     """
132     bdev_s = params[0]
133     bdev = objects.Disk.FromDict(bdev_s)
134     return backend.BlockdevRemove(bdev)
135
136   @staticmethod
137   def perspective_blockdev_rename(params):
138     """Remove a block device.
139
140     """
141     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
142     return backend.BlockdevRename(devlist)
143
144   @staticmethod
145   def perspective_blockdev_assemble(params):
146     """Assemble a block device.
147
148     """
149     bdev_s, owner, on_primary = params
150     bdev = objects.Disk.FromDict(bdev_s)
151     if bdev is None:
152       raise ValueError("can't unserialize data!")
153     return backend.BlockdevAssemble(bdev, owner, on_primary)
154
155   @staticmethod
156   def perspective_blockdev_shutdown(params):
157     """Shutdown a block device.
158
159     """
160     bdev_s = params[0]
161     bdev = objects.Disk.FromDict(bdev_s)
162     if bdev is None:
163       raise ValueError("can't unserialize data!")
164     return backend.BlockdevShutdown(bdev)
165
166   @staticmethod
167   def perspective_blockdev_addchildren(params):
168     """Add a child to a mirror device.
169
170     Note: this is only valid for mirror devices. It's the caller's duty
171     to send a correct disk, otherwise we raise an error.
172
173     """
174     bdev_s, ndev_s = params
175     bdev = objects.Disk.FromDict(bdev_s)
176     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177     if bdev is None or ndevs.count(None) > 0:
178       raise ValueError("can't unserialize data!")
179     return backend.BlockdevAddchildren(bdev, ndevs)
180
181   @staticmethod
182   def perspective_blockdev_removechildren(params):
183     """Remove a child from a mirror device.
184
185     This is only valid for mirror devices, of course. It's the callers
186     duty to send a correct disk, otherwise we raise an error.
187
188     """
189     bdev_s, ndev_s = params
190     bdev = objects.Disk.FromDict(bdev_s)
191     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
192     if bdev is None or ndevs.count(None) > 0:
193       raise ValueError("can't unserialize data!")
194     return backend.BlockdevRemovechildren(bdev, ndevs)
195
196   @staticmethod
197   def perspective_blockdev_getmirrorstatus(params):
198     """Return the mirror status for a list of disks.
199
200     """
201     disks = [objects.Disk.FromDict(dsk_s)
202             for dsk_s in params]
203     return backend.BlockdevGetmirrorstatus(disks)
204
205   @staticmethod
206   def perspective_blockdev_find(params):
207     """Expose the FindBlockDevice functionality for a disk.
208
209     This will try to find but not activate a disk.
210
211     """
212     disk = objects.Disk.FromDict(params[0])
213     return backend.BlockdevFind(disk)
214
215   @staticmethod
216   def perspective_blockdev_snapshot(params):
217     """Create a snapshot device.
218
219     Note that this is only valid for LVM disks, if we get passed
220     something else we raise an exception. The snapshot device can be
221     remove by calling the generic block device remove call.
222
223     """
224     cfbd = objects.Disk.FromDict(params[0])
225     return backend.BlockdevSnapshot(cfbd)
226
227   @staticmethod
228   def perspective_blockdev_grow(params):
229     """Grow a stack of devices.
230
231     """
232     cfbd = objects.Disk.FromDict(params[0])
233     amount = params[1]
234     return backend.BlockdevGrow(cfbd, amount)
235
236   @staticmethod
237   def perspective_blockdev_close(params):
238     """Closes the given block devices.
239
240     """
241     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
242     return backend.BlockdevClose(params[0], disks)
243
244   # blockdev/drbd specific methods ----------
245
246   @staticmethod
247   def perspective_drbd_disconnect_net(params):
248     """Disconnects the network connection of drbd disks.
249
250     Note that this is only valid for drbd disks, so the members of the
251     disk list must all be drbd devices.
252
253     """
254     nodes_ip, disks = params
255     disks = [objects.Disk.FromDict(cf) for cf in disks]
256     return backend.DrbdDisconnectNet(nodes_ip, disks)
257
258   @staticmethod
259   def perspective_drbd_attach_net(params):
260     """Attaches the network connection of drbd disks.
261
262     Note that this is only valid for drbd disks, so the members of the
263     disk list must all be drbd devices.
264
265     """
266     nodes_ip, disks, instance_name, multimaster = params
267     disks = [objects.Disk.FromDict(cf) for cf in disks]
268     return backend.DrbdAttachNet(nodes_ip, disks,
269                                      instance_name, multimaster)
270
271   @staticmethod
272   def perspective_drbd_wait_sync(params):
273     """Wait until DRBD disks are synched.
274
275     Note that this is only valid for drbd disks, so the members of the
276     disk list must all be drbd devices.
277
278     """
279     nodes_ip, disks = params
280     disks = [objects.Disk.FromDict(cf) for cf in disks]
281     return backend.DrbdWaitSync(nodes_ip, disks)
282
283   # export/import  --------------------------
284
285   @staticmethod
286   def perspective_snapshot_export(params):
287     """Export a given snapshot.
288
289     """
290     disk = objects.Disk.FromDict(params[0])
291     dest_node = params[1]
292     instance = objects.Instance.FromDict(params[2])
293     cluster_name = params[3]
294     dev_idx = params[4]
295     return backend.ExportSnapshot(disk, dest_node, instance,
296                                   cluster_name, dev_idx)
297
298   @staticmethod
299   def perspective_finalize_export(params):
300     """Expose the finalize export functionality.
301
302     """
303     instance = objects.Instance.FromDict(params[0])
304     snap_disks = [objects.Disk.FromDict(str_data)
305                   for str_data in params[1]]
306     return backend.FinalizeExport(instance, snap_disks)
307
308   @staticmethod
309   def perspective_export_info(params):
310     """Query information about an existing export on this node.
311
312     The given path may not contain an export, in which case we return
313     None.
314
315     """
316     path = params[0]
317     return backend.ExportInfo(path)
318
319   @staticmethod
320   def perspective_export_list(params):
321     """List the available exports on this node.
322
323     Note that as opposed to export_info, which may query data about an
324     export in any path, this only queries the standard Ganeti path
325     (constants.EXPORT_DIR).
326
327     """
328     return backend.ListExports()
329
330   @staticmethod
331   def perspective_export_remove(params):
332     """Remove an export.
333
334     """
335     export = params[0]
336     return backend.RemoveExport(export)
337
338   # volume  --------------------------
339
340   @staticmethod
341   def perspective_volume_list(params):
342     """Query the list of logical volumes in a given volume group.
343
344     """
345     vgname = params[0]
346     return True, backend.GetVolumeList(vgname)
347
348   @staticmethod
349   def perspective_vg_list(params):
350     """Query the list of volume groups.
351
352     """
353     return backend.ListVolumeGroups()
354
355   # bridge  --------------------------
356
357   @staticmethod
358   def perspective_bridges_exist(params):
359     """Check if all bridges given exist on this node.
360
361     """
362     bridges_list = params[0]
363     return backend.BridgesExist(bridges_list)
364
365   # instance  --------------------------
366
367   @staticmethod
368   def perspective_instance_os_add(params):
369     """Install an OS on a given instance.
370
371     """
372     inst_s = params[0]
373     inst = objects.Instance.FromDict(inst_s)
374     reinstall = params[1]
375     return backend.InstanceOsAdd(inst, reinstall)
376
377   @staticmethod
378   def perspective_instance_run_rename(params):
379     """Runs the OS rename script for an instance.
380
381     """
382     inst_s, old_name = params
383     inst = objects.Instance.FromDict(inst_s)
384     return backend.RunRenameInstance(inst, old_name)
385
386   @staticmethod
387   def perspective_instance_os_import(params):
388     """Run the import function of an OS onto a given instance.
389
390     """
391     inst_s, src_node, src_images, cluster_name = params
392     inst = objects.Instance.FromDict(inst_s)
393     return backend.ImportOSIntoInstance(inst, src_node, src_images,
394                                         cluster_name)
395
396   @staticmethod
397   def perspective_instance_shutdown(params):
398     """Shutdown an instance.
399
400     """
401     instance = objects.Instance.FromDict(params[0])
402     return backend.InstanceShutdown(instance)
403
404   @staticmethod
405   def perspective_instance_start(params):
406     """Start an instance.
407
408     """
409     instance = objects.Instance.FromDict(params[0])
410     return backend.StartInstance(instance)
411
412   @staticmethod
413   def perspective_migration_info(params):
414     """Gather information about an instance to be migrated.
415
416     """
417     instance = objects.Instance.FromDict(params[0])
418     return backend.MigrationInfo(instance)
419
420   @staticmethod
421   def perspective_accept_instance(params):
422     """Prepare the node to accept an instance.
423
424     """
425     instance, info, target = params
426     instance = objects.Instance.FromDict(instance)
427     return backend.AcceptInstance(instance, info, target)
428
429   @staticmethod
430   def perspective_finalize_migration(params):
431     """Finalize the instance migration.
432
433     """
434     instance, info, success = params
435     instance = objects.Instance.FromDict(instance)
436     return backend.FinalizeMigration(instance, info, success)
437
438   @staticmethod
439   def perspective_instance_migrate(params):
440     """Migrates an instance.
441
442     """
443     instance, target, live = params
444     instance = objects.Instance.FromDict(instance)
445     return backend.MigrateInstance(instance, target, live)
446
447   @staticmethod
448   def perspective_instance_reboot(params):
449     """Reboot an instance.
450
451     """
452     instance = objects.Instance.FromDict(params[0])
453     reboot_type = params[1]
454     return backend.InstanceReboot(instance, reboot_type)
455
456   @staticmethod
457   def perspective_instance_info(params):
458     """Query instance information.
459
460     """
461     return backend.GetInstanceInfo(params[0], params[1])
462
463   @staticmethod
464   def perspective_instance_migratable(params):
465     """Query whether the specified instance can be migrated.
466
467     """
468     instance = objects.Instance.FromDict(params[0])
469     return backend.GetInstanceMigratable(instance)
470
471   @staticmethod
472   def perspective_all_instances_info(params):
473     """Query information about all instances.
474
475     """
476     return backend.GetAllInstancesInfo(params[0])
477
478   @staticmethod
479   def perspective_instance_list(params):
480     """Query the list of running instances.
481
482     """
483     return True, backend.GetInstanceList(params[0])
484
485   # node --------------------------
486
487   @staticmethod
488   def perspective_node_tcp_ping(params):
489     """Do a TcpPing on the remote node.
490
491     """
492     return utils.TcpPing(params[1], params[2], timeout=params[3],
493                          live_port_needed=params[4], source=params[0])
494
495   @staticmethod
496   def perspective_node_has_ip_address(params):
497     """Checks if a node has the given ip address.
498
499     """
500     return True, utils.OwnIpAddress(params[0])
501
502   @staticmethod
503   def perspective_node_info(params):
504     """Query node information.
505
506     """
507     vgname, hypervisor_type = params
508     return backend.GetNodeInfo(vgname, hypervisor_type)
509
510   @staticmethod
511   def perspective_node_add(params):
512     """Complete the registration of this node in the cluster.
513
514     """
515     return backend.AddNode(params[0], params[1], params[2],
516                            params[3], params[4], params[5])
517
518   @staticmethod
519   def perspective_node_verify(params):
520     """Run a verify sequence on this node.
521
522     """
523     return backend.VerifyNode(params[0], params[1])
524
525   @staticmethod
526   def perspective_node_start_master(params):
527     """Promote this node to master status.
528
529     """
530     return backend.StartMaster(params[0])
531
532   @staticmethod
533   def perspective_node_stop_master(params):
534     """Demote this node from master status.
535
536     """
537     return backend.StopMaster(params[0])
538
539   @staticmethod
540   def perspective_node_leave_cluster(params):
541     """Cleanup after leaving a cluster.
542
543     """
544     return backend.LeaveCluster()
545
546   @staticmethod
547   def perspective_node_volumes(params):
548     """Query the list of all logical volume groups.
549
550     """
551     return backend.NodeVolumes()
552
553   @staticmethod
554   def perspective_node_demote_from_mc(params):
555     """Demote a node from the master candidate role.
556
557     """
558     return backend.DemoteFromMC()
559
560
561   @staticmethod
562   def perspective_node_powercycle(params):
563     """Tries to powercycle the nod.
564
565     """
566     hypervisor_type = params[0]
567     return backend.PowercycleNode(hypervisor_type)
568
569
570   # cluster --------------------------
571
572   @staticmethod
573   def perspective_version(params):
574     """Query version information.
575
576     """
577     return True, constants.PROTOCOL_VERSION
578
579   @staticmethod
580   def perspective_upload_file(params):
581     """Upload a file.
582
583     Note that the backend implementation imposes strict rules on which
584     files are accepted.
585
586     """
587     return backend.UploadFile(*params)
588
589   @staticmethod
590   def perspective_master_info(params):
591     """Query master information.
592
593     """
594     return backend.GetMasterInfo()
595
596   @staticmethod
597   def perspective_write_ssconf_files(params):
598     """Write ssconf files.
599
600     """
601     (values,) = params
602     return backend.WriteSsconfFiles(values)
603
604   # os -----------------------
605
606   @staticmethod
607   def perspective_os_diagnose(params):
608     """Query detailed information about existing OSes.
609
610     """
611     return True, [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
612
613   @staticmethod
614   def perspective_os_get(params):
615     """Query information about a given OS.
616
617     """
618     name = params[0]
619     try:
620       os_obj = backend.OSFromDisk(name)
621     except errors.InvalidOS, err:
622       os_obj = objects.OS.FromInvalidOS(err)
623     return os_obj.ToDict()
624
625   # hooks -----------------------
626
627   @staticmethod
628   def perspective_hooks_runner(params):
629     """Run hook scripts.
630
631     """
632     hpath, phase, env = params
633     hr = backend.HooksRunner()
634     return hr.RunHooks(hpath, phase, env)
635
636   # iallocator -----------------
637
638   @staticmethod
639   def perspective_iallocator_runner(params):
640     """Run an iallocator script.
641
642     """
643     name, idata = params
644     iar = backend.IAllocatorRunner()
645     return iar.Run(name, idata)
646
647   # test -----------------------
648
649   @staticmethod
650   def perspective_test_delay(params):
651     """Run test delay.
652
653     """
654     duration = params[0]
655     return utils.TestDelay(duration)
656
657   # file storage ---------------
658
659   @staticmethod
660   def perspective_file_storage_dir_create(params):
661     """Create the file storage directory.
662
663     """
664     file_storage_dir = params[0]
665     return backend.CreateFileStorageDir(file_storage_dir)
666
667   @staticmethod
668   def perspective_file_storage_dir_remove(params):
669     """Remove the file storage directory.
670
671     """
672     file_storage_dir = params[0]
673     return backend.RemoveFileStorageDir(file_storage_dir)
674
675   @staticmethod
676   def perspective_file_storage_dir_rename(params):
677     """Rename the file storage directory.
678
679     """
680     old_file_storage_dir = params[0]
681     new_file_storage_dir = params[1]
682     return backend.RenameFileStorageDir(old_file_storage_dir,
683                                         new_file_storage_dir)
684
685   # jobs ------------------------
686
687   @staticmethod
688   @_RequireJobQueueLock
689   def perspective_jobqueue_update(params):
690     """Update job queue.
691
692     """
693     (file_name, content) = params
694     return backend.JobQueueUpdate(file_name, content)
695
696   @staticmethod
697   @_RequireJobQueueLock
698   def perspective_jobqueue_purge(params):
699     """Purge job queue.
700
701     """
702     return backend.JobQueuePurge()
703
704   @staticmethod
705   @_RequireJobQueueLock
706   def perspective_jobqueue_rename(params):
707     """Rename a job queue file.
708
709     """
710     # TODO: What if a file fails to rename?
711     return [backend.JobQueueRename(old, new) for old, new in params]
712
713   @staticmethod
714   def perspective_jobqueue_set_drain(params):
715     """Set/unset the queue drain flag.
716
717     """
718     drain_flag = params[0]
719     return backend.JobQueueSetDrainFlag(drain_flag)
720
721
722   # hypervisor ---------------
723
724   @staticmethod
725   def perspective_hypervisor_validate_params(params):
726     """Validate the hypervisor parameters.
727
728     """
729     (hvname, hvparams) = params
730     return backend.ValidateHVParams(hvname, hvparams)
731
732
733 def ParseOptions():
734   """Parse the command line options.
735
736   @return: (options, args) as from OptionParser.parse_args()
737
738   """
739   parser = OptionParser(description="Ganeti node daemon",
740                         usage="%prog [-f] [-d] [-b ADDRESS]",
741                         version="%%prog (ganeti) %s" %
742                         constants.RELEASE_VERSION)
743
744   parser.add_option("-f", "--foreground", dest="fork",
745                     help="Don't detach from the current terminal",
746                     default=True, action="store_false")
747   parser.add_option("-d", "--debug", dest="debug",
748                     help="Enable some debug messages",
749                     default=False, action="store_true")
750   parser.add_option("-b", "--bind", dest="bind_address",
751                     help="Bind address",
752                     default="", metavar="ADDRESS")
753
754   options, args = parser.parse_args()
755   return options, args
756
757
758 def main():
759   """Main function for the node daemon.
760
761   """
762   global queue_lock
763
764   options, args = ParseOptions()
765   utils.debug = options.debug
766
767   if options.fork:
768     utils.CloseFDs()
769
770   for fname in (constants.SSL_CERT_FILE,):
771     if not os.path.isfile(fname):
772       print "config %s not there, will not run." % fname
773       sys.exit(5)
774
775   try:
776     port = utils.GetNodeDaemonPort()
777   except errors.ConfigurationError, err:
778     print "Cluster configuration incomplete: '%s'" % str(err)
779     sys.exit(5)
780
781   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
782   dirs.append((constants.LOG_OS_DIR, 0750))
783   dirs.append((constants.LOCK_DIR, 1777))
784   utils.EnsureDirs(dirs)
785
786   # become a daemon
787   if options.fork:
788     utils.Daemonize(logfile=constants.LOG_NODESERVER)
789
790   utils.WritePidFile(constants.NODED_PID)
791   try:
792     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
793                        stderr_logging=not options.fork)
794     logging.info("ganeti node daemon startup")
795
796     # Read SSL certificate
797     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
798                                     ssl_cert_path=constants.SSL_CERT_FILE)
799
800     # Prepare job queue
801     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
802
803     mainloop = daemon.Mainloop()
804     server = NodeHttpServer(mainloop, options.bind_address, port,
805                             ssl_params=ssl_params, ssl_verify_peer=True)
806     server.Start()
807     try:
808       mainloop.Run()
809     finally:
810       server.Stop()
811   finally:
812     utils.RemovePidFile(constants.NODED_PID)
813
814
815 if __name__ == '__main__':
816   main()