Implement debug level across OS-related RPC calls
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48
49 import ganeti.http.server # pylint: disable-msg=W0611
50
51
52 queue_lock = None
53
54
55 def _RequireJobQueueLock(fn):
56   """Decorator for job queue manipulating functions.
57
58   """
59   QUEUE_LOCK_TIMEOUT = 10
60
61   def wrapper(*args, **kwargs):
62     # Locking in exclusive, blocking mode because there could be several
63     # children running at the same time. Waiting up to 10 seconds.
64     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
65     try:
66       return fn(*args, **kwargs)
67     finally:
68       queue_lock.Unlock()
69
70   return wrapper
71
72
73 class NodeHttpServer(http.server.HttpServer):
74   """The server implementation.
75
76   This class holds all methods exposed over the RPC interface.
77
78   """
79   # too many public methods, and unused args - all methods get params
80   # due to the API
81   # pylint: disable-msg=R0904,W0613
82   def __init__(self, *args, **kwargs):
83     http.server.HttpServer.__init__(self, *args, **kwargs)
84     self.noded_pid = os.getpid()
85
86   def HandleRequest(self, req):
87     """Handle a request.
88
89     """
90     if req.request_method.upper() != http.HTTP_PUT:
91       raise http.HttpBadRequest()
92
93     path = req.request_path
94     if path.startswith("/"):
95       path = path[1:]
96
97     method = getattr(self, "perspective_%s" % path, None)
98     if method is None:
99       raise http.HttpNotFound()
100
101     try:
102       rvalue = method(req.request_body)
103       return True, rvalue
104
105     except backend.RPCFail, err:
106       # our custom failure exception; str(err) works fine if the
107       # exception was constructed with a single argument, and in
108       # this case, err.message == err.args[0] == str(err)
109       return (False, str(err))
110     except errors.QuitGanetiException, err:
111       # Tell parent to quit
112       logging.info("Shutting down the node daemon, arguments: %s",
113                    str(err.args))
114       os.kill(self.noded_pid, signal.SIGTERM)
115       # And return the error's arguments, which must be already in
116       # correct tuple format
117       return err.args
118     except Exception, err:
119       logging.exception("Error in RPC call")
120       return False, "Error while executing backend function: %s" % str(err)
121
122   # the new block devices  --------------------------
123
124   @staticmethod
125   def perspective_blockdev_create(params):
126     """Create a block device.
127
128     """
129     bdev_s, size, owner, on_primary, info = params
130     bdev = objects.Disk.FromDict(bdev_s)
131     if bdev is None:
132       raise ValueError("can't unserialize data!")
133     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
134
135   @staticmethod
136   def perspective_blockdev_remove(params):
137     """Remove a block device.
138
139     """
140     bdev_s = params[0]
141     bdev = objects.Disk.FromDict(bdev_s)
142     return backend.BlockdevRemove(bdev)
143
144   @staticmethod
145   def perspective_blockdev_rename(params):
146     """Remove a block device.
147
148     """
149     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
150     return backend.BlockdevRename(devlist)
151
152   @staticmethod
153   def perspective_blockdev_assemble(params):
154     """Assemble a block device.
155
156     """
157     bdev_s, owner, on_primary = params
158     bdev = objects.Disk.FromDict(bdev_s)
159     if bdev is None:
160       raise ValueError("can't unserialize data!")
161     return backend.BlockdevAssemble(bdev, owner, on_primary)
162
163   @staticmethod
164   def perspective_blockdev_shutdown(params):
165     """Shutdown a block device.
166
167     """
168     bdev_s = params[0]
169     bdev = objects.Disk.FromDict(bdev_s)
170     if bdev is None:
171       raise ValueError("can't unserialize data!")
172     return backend.BlockdevShutdown(bdev)
173
174   @staticmethod
175   def perspective_blockdev_addchildren(params):
176     """Add a child to a mirror device.
177
178     Note: this is only valid for mirror devices. It's the caller's duty
179     to send a correct disk, otherwise we raise an error.
180
181     """
182     bdev_s, ndev_s = params
183     bdev = objects.Disk.FromDict(bdev_s)
184     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
185     if bdev is None or ndevs.count(None) > 0:
186       raise ValueError("can't unserialize data!")
187     return backend.BlockdevAddchildren(bdev, ndevs)
188
189   @staticmethod
190   def perspective_blockdev_removechildren(params):
191     """Remove a child from a mirror device.
192
193     This is only valid for mirror devices, of course. It's the callers
194     duty to send a correct disk, otherwise we raise an error.
195
196     """
197     bdev_s, ndev_s = params
198     bdev = objects.Disk.FromDict(bdev_s)
199     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
200     if bdev is None or ndevs.count(None) > 0:
201       raise ValueError("can't unserialize data!")
202     return backend.BlockdevRemovechildren(bdev, ndevs)
203
204   @staticmethod
205   def perspective_blockdev_getmirrorstatus(params):
206     """Return the mirror status for a list of disks.
207
208     """
209     disks = [objects.Disk.FromDict(dsk_s)
210              for dsk_s in params]
211     return [status.ToDict()
212             for status in backend.BlockdevGetmirrorstatus(disks)]
213
214   @staticmethod
215   def perspective_blockdev_find(params):
216     """Expose the FindBlockDevice functionality for a disk.
217
218     This will try to find but not activate a disk.
219
220     """
221     disk = objects.Disk.FromDict(params[0])
222
223     result = backend.BlockdevFind(disk)
224     if result is None:
225       return None
226
227     return result.ToDict()
228
229   @staticmethod
230   def perspective_blockdev_snapshot(params):
231     """Create a snapshot device.
232
233     Note that this is only valid for LVM disks, if we get passed
234     something else we raise an exception. The snapshot device can be
235     remove by calling the generic block device remove call.
236
237     """
238     cfbd = objects.Disk.FromDict(params[0])
239     return backend.BlockdevSnapshot(cfbd)
240
241   @staticmethod
242   def perspective_blockdev_grow(params):
243     """Grow a stack of devices.
244
245     """
246     cfbd = objects.Disk.FromDict(params[0])
247     amount = params[1]
248     return backend.BlockdevGrow(cfbd, amount)
249
250   @staticmethod
251   def perspective_blockdev_close(params):
252     """Closes the given block devices.
253
254     """
255     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
256     return backend.BlockdevClose(params[0], disks)
257
258   @staticmethod
259   def perspective_blockdev_getsize(params):
260     """Compute the sizes of the given block devices.
261
262     """
263     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
264     return backend.BlockdevGetsize(disks)
265
266   @staticmethod
267   def perspective_blockdev_export(params):
268     """Compute the sizes of the given block devices.
269
270     """
271     disk = objects.Disk.FromDict(params[0])
272     dest_node, dest_path, cluster_name = params[1:]
273     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
274
275   # blockdev/drbd specific methods ----------
276
277   @staticmethod
278   def perspective_drbd_disconnect_net(params):
279     """Disconnects the network connection of drbd disks.
280
281     Note that this is only valid for drbd disks, so the members of the
282     disk list must all be drbd devices.
283
284     """
285     nodes_ip, disks = params
286     disks = [objects.Disk.FromDict(cf) for cf in disks]
287     return backend.DrbdDisconnectNet(nodes_ip, disks)
288
289   @staticmethod
290   def perspective_drbd_attach_net(params):
291     """Attaches the network connection of drbd disks.
292
293     Note that this is only valid for drbd disks, so the members of the
294     disk list must all be drbd devices.
295
296     """
297     nodes_ip, disks, instance_name, multimaster = params
298     disks = [objects.Disk.FromDict(cf) for cf in disks]
299     return backend.DrbdAttachNet(nodes_ip, disks,
300                                      instance_name, multimaster)
301
302   @staticmethod
303   def perspective_drbd_wait_sync(params):
304     """Wait until DRBD disks are synched.
305
306     Note that this is only valid for drbd disks, so the members of the
307     disk list must all be drbd devices.
308
309     """
310     nodes_ip, disks = params
311     disks = [objects.Disk.FromDict(cf) for cf in disks]
312     return backend.DrbdWaitSync(nodes_ip, disks)
313
314   # export/import  --------------------------
315
316   @staticmethod
317   def perspective_snapshot_export(params):
318     """Export a given snapshot.
319
320     """
321     disk = objects.Disk.FromDict(params[0])
322     dest_node = params[1]
323     instance = objects.Instance.FromDict(params[2])
324     cluster_name = params[3]
325     dev_idx = params[4]
326     debug = params[5]
327     return backend.ExportSnapshot(disk, dest_node, instance,
328                                   cluster_name, dev_idx, debug)
329
330   @staticmethod
331   def perspective_finalize_export(params):
332     """Expose the finalize export functionality.
333
334     """
335     instance = objects.Instance.FromDict(params[0])
336     snap_disks = [objects.Disk.FromDict(str_data)
337                   for str_data in params[1]]
338     return backend.FinalizeExport(instance, snap_disks)
339
340   @staticmethod
341   def perspective_export_info(params):
342     """Query information about an existing export on this node.
343
344     The given path may not contain an export, in which case we return
345     None.
346
347     """
348     path = params[0]
349     return backend.ExportInfo(path)
350
351   @staticmethod
352   def perspective_export_list(params):
353     """List the available exports on this node.
354
355     Note that as opposed to export_info, which may query data about an
356     export in any path, this only queries the standard Ganeti path
357     (constants.EXPORT_DIR).
358
359     """
360     return backend.ListExports()
361
362   @staticmethod
363   def perspective_export_remove(params):
364     """Remove an export.
365
366     """
367     export = params[0]
368     return backend.RemoveExport(export)
369
370   # volume  --------------------------
371
372   @staticmethod
373   def perspective_lv_list(params):
374     """Query the list of logical volumes in a given volume group.
375
376     """
377     vgname = params[0]
378     return backend.GetVolumeList(vgname)
379
380   @staticmethod
381   def perspective_vg_list(params):
382     """Query the list of volume groups.
383
384     """
385     return backend.ListVolumeGroups()
386
387   # Storage --------------------------
388
389   @staticmethod
390   def perspective_storage_list(params):
391     """Get list of storage units.
392
393     """
394     (su_name, su_args, name, fields) = params
395     return storage.GetStorage(su_name, *su_args).List(name, fields)
396
397   @staticmethod
398   def perspective_storage_modify(params):
399     """Modify a storage unit.
400
401     """
402     (su_name, su_args, name, changes) = params
403     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
404
405   @staticmethod
406   def perspective_storage_execute(params):
407     """Execute an operation on a storage unit.
408
409     """
410     (su_name, su_args, name, op) = params
411     return storage.GetStorage(su_name, *su_args).Execute(name, op)
412
413   # bridge  --------------------------
414
415   @staticmethod
416   def perspective_bridges_exist(params):
417     """Check if all bridges given exist on this node.
418
419     """
420     bridges_list = params[0]
421     return backend.BridgesExist(bridges_list)
422
423   # instance  --------------------------
424
425   @staticmethod
426   def perspective_instance_os_add(params):
427     """Install an OS on a given instance.
428
429     """
430     inst_s = params[0]
431     inst = objects.Instance.FromDict(inst_s)
432     reinstall = params[1]
433     debug = params[2]
434     return backend.InstanceOsAdd(inst, reinstall, debug)
435
436   @staticmethod
437   def perspective_instance_run_rename(params):
438     """Runs the OS rename script for an instance.
439
440     """
441     inst_s, old_name, debug = params
442     inst = objects.Instance.FromDict(inst_s)
443     return backend.RunRenameInstance(inst, old_name, debug)
444
445   @staticmethod
446   def perspective_instance_os_import(params):
447     """Run the import function of an OS onto a given instance.
448
449     """
450     inst_s, src_node, src_images, cluster_name, debug = params
451     inst = objects.Instance.FromDict(inst_s)
452     return backend.ImportOSIntoInstance(inst, src_node, src_images,
453                                         cluster_name, debug)
454
455   @staticmethod
456   def perspective_instance_shutdown(params):
457     """Shutdown an instance.
458
459     """
460     instance = objects.Instance.FromDict(params[0])
461     timeout = params[1]
462     return backend.InstanceShutdown(instance, timeout)
463
464   @staticmethod
465   def perspective_instance_start(params):
466     """Start an instance.
467
468     """
469     instance = objects.Instance.FromDict(params[0])
470     return backend.StartInstance(instance)
471
472   @staticmethod
473   def perspective_migration_info(params):
474     """Gather information about an instance to be migrated.
475
476     """
477     instance = objects.Instance.FromDict(params[0])
478     return backend.MigrationInfo(instance)
479
480   @staticmethod
481   def perspective_accept_instance(params):
482     """Prepare the node to accept an instance.
483
484     """
485     instance, info, target = params
486     instance = objects.Instance.FromDict(instance)
487     return backend.AcceptInstance(instance, info, target)
488
489   @staticmethod
490   def perspective_finalize_migration(params):
491     """Finalize the instance migration.
492
493     """
494     instance, info, success = params
495     instance = objects.Instance.FromDict(instance)
496     return backend.FinalizeMigration(instance, info, success)
497
498   @staticmethod
499   def perspective_instance_migrate(params):
500     """Migrates an instance.
501
502     """
503     instance, target, live = params
504     instance = objects.Instance.FromDict(instance)
505     return backend.MigrateInstance(instance, target, live)
506
507   @staticmethod
508   def perspective_instance_reboot(params):
509     """Reboot an instance.
510
511     """
512     instance = objects.Instance.FromDict(params[0])
513     reboot_type = params[1]
514     shutdown_timeout = params[2]
515     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
516
517   @staticmethod
518   def perspective_instance_info(params):
519     """Query instance information.
520
521     """
522     return backend.GetInstanceInfo(params[0], params[1])
523
524   @staticmethod
525   def perspective_instance_migratable(params):
526     """Query whether the specified instance can be migrated.
527
528     """
529     instance = objects.Instance.FromDict(params[0])
530     return backend.GetInstanceMigratable(instance)
531
532   @staticmethod
533   def perspective_all_instances_info(params):
534     """Query information about all instances.
535
536     """
537     return backend.GetAllInstancesInfo(params[0])
538
539   @staticmethod
540   def perspective_instance_list(params):
541     """Query the list of running instances.
542
543     """
544     return backend.GetInstanceList(params[0])
545
546   # node --------------------------
547
548   @staticmethod
549   def perspective_node_tcp_ping(params):
550     """Do a TcpPing on the remote node.
551
552     """
553     return utils.TcpPing(params[1], params[2], timeout=params[3],
554                          live_port_needed=params[4], source=params[0])
555
556   @staticmethod
557   def perspective_node_has_ip_address(params):
558     """Checks if a node has the given ip address.
559
560     """
561     return utils.OwnIpAddress(params[0])
562
563   @staticmethod
564   def perspective_node_info(params):
565     """Query node information.
566
567     """
568     vgname, hypervisor_type = params
569     return backend.GetNodeInfo(vgname, hypervisor_type)
570
571   @staticmethod
572   def perspective_node_add(params):
573     """Complete the registration of this node in the cluster.
574
575     """
576     return backend.AddNode(params[0], params[1], params[2],
577                            params[3], params[4], params[5])
578
579   @staticmethod
580   def perspective_node_verify(params):
581     """Run a verify sequence on this node.
582
583     """
584     return backend.VerifyNode(params[0], params[1])
585
586   @staticmethod
587   def perspective_node_start_master(params):
588     """Promote this node to master status.
589
590     """
591     return backend.StartMaster(params[0], params[1])
592
593   @staticmethod
594   def perspective_node_stop_master(params):
595     """Demote this node from master status.
596
597     """
598     return backend.StopMaster(params[0])
599
600   @staticmethod
601   def perspective_node_leave_cluster(params):
602     """Cleanup after leaving a cluster.
603
604     """
605     return backend.LeaveCluster(params[0])
606
607   @staticmethod
608   def perspective_node_volumes(params):
609     """Query the list of all logical volume groups.
610
611     """
612     return backend.NodeVolumes()
613
614   @staticmethod
615   def perspective_node_demote_from_mc(params):
616     """Demote a node from the master candidate role.
617
618     """
619     return backend.DemoteFromMC()
620
621
622   @staticmethod
623   def perspective_node_powercycle(params):
624     """Tries to powercycle the nod.
625
626     """
627     hypervisor_type = params[0]
628     return backend.PowercycleNode(hypervisor_type)
629
630
631   # cluster --------------------------
632
633   @staticmethod
634   def perspective_version(params):
635     """Query version information.
636
637     """
638     return constants.PROTOCOL_VERSION
639
640   @staticmethod
641   def perspective_upload_file(params):
642     """Upload a file.
643
644     Note that the backend implementation imposes strict rules on which
645     files are accepted.
646
647     """
648     return backend.UploadFile(*params)
649
650   @staticmethod
651   def perspective_master_info(params):
652     """Query master information.
653
654     """
655     return backend.GetMasterInfo()
656
657   @staticmethod
658   def perspective_write_ssconf_files(params):
659     """Write ssconf files.
660
661     """
662     (values,) = params
663     return backend.WriteSsconfFiles(values)
664
665   # os -----------------------
666
667   @staticmethod
668   def perspective_os_diagnose(params):
669     """Query detailed information about existing OSes.
670
671     """
672     return backend.DiagnoseOS()
673
674   @staticmethod
675   def perspective_os_get(params):
676     """Query information about a given OS.
677
678     """
679     name = params[0]
680     os_obj = backend.OSFromDisk(name)
681     return os_obj.ToDict()
682
683   # hooks -----------------------
684
685   @staticmethod
686   def perspective_hooks_runner(params):
687     """Run hook scripts.
688
689     """
690     hpath, phase, env = params
691     hr = backend.HooksRunner()
692     return hr.RunHooks(hpath, phase, env)
693
694   # iallocator -----------------
695
696   @staticmethod
697   def perspective_iallocator_runner(params):
698     """Run an iallocator script.
699
700     """
701     name, idata = params
702     iar = backend.IAllocatorRunner()
703     return iar.Run(name, idata)
704
705   # test -----------------------
706
707   @staticmethod
708   def perspective_test_delay(params):
709     """Run test delay.
710
711     """
712     duration = params[0]
713     status, rval = utils.TestDelay(duration)
714     if not status:
715       raise backend.RPCFail(rval)
716     return rval
717
718   # file storage ---------------
719
720   @staticmethod
721   def perspective_file_storage_dir_create(params):
722     """Create the file storage directory.
723
724     """
725     file_storage_dir = params[0]
726     return backend.CreateFileStorageDir(file_storage_dir)
727
728   @staticmethod
729   def perspective_file_storage_dir_remove(params):
730     """Remove the file storage directory.
731
732     """
733     file_storage_dir = params[0]
734     return backend.RemoveFileStorageDir(file_storage_dir)
735
736   @staticmethod
737   def perspective_file_storage_dir_rename(params):
738     """Rename the file storage directory.
739
740     """
741     old_file_storage_dir = params[0]
742     new_file_storage_dir = params[1]
743     return backend.RenameFileStorageDir(old_file_storage_dir,
744                                         new_file_storage_dir)
745
746   # jobs ------------------------
747
748   @staticmethod
749   @_RequireJobQueueLock
750   def perspective_jobqueue_update(params):
751     """Update job queue.
752
753     """
754     (file_name, content) = params
755     return backend.JobQueueUpdate(file_name, content)
756
757   @staticmethod
758   @_RequireJobQueueLock
759   def perspective_jobqueue_purge(params):
760     """Purge job queue.
761
762     """
763     return backend.JobQueuePurge()
764
765   @staticmethod
766   @_RequireJobQueueLock
767   def perspective_jobqueue_rename(params):
768     """Rename a job queue file.
769
770     """
771     # TODO: What if a file fails to rename?
772     return [backend.JobQueueRename(old, new) for old, new in params]
773
774   @staticmethod
775   def perspective_jobqueue_set_drain(params):
776     """Set/unset the queue drain flag.
777
778     """
779     drain_flag = params[0]
780     return backend.JobQueueSetDrainFlag(drain_flag)
781
782
783   # hypervisor ---------------
784
785   @staticmethod
786   def perspective_hypervisor_validate_params(params):
787     """Validate the hypervisor parameters.
788
789     """
790     (hvname, hvparams) = params
791     return backend.ValidateHVParams(hvname, hvparams)
792
793
794 def CheckNoded(_, args):
795   """Initial checks whether to run or exit with a failure.
796
797   """
798   if args: # noded doesn't take any arguments
799     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
800                           sys.argv[0])
801     sys.exit(constants.EXIT_FAILURE)
802
803
804 def ExecNoded(options, _):
805   """Main node daemon function, executed with the PID file held.
806
807   """
808   global queue_lock # pylint: disable-msg=W0603
809
810   # Read SSL certificate
811   if options.ssl:
812     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
813                                     ssl_cert_path=options.ssl_cert)
814   else:
815     ssl_params = None
816
817   # Prepare job queue
818   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
819
820   mainloop = daemon.Mainloop()
821   server = NodeHttpServer(mainloop, options.bind_address, options.port,
822                           ssl_params=ssl_params, ssl_verify_peer=True)
823   server.Start()
824   try:
825     mainloop.Run()
826   finally:
827     server.Stop()
828
829
830 def main():
831   """Main function for the node daemon.
832
833   """
834   parser = OptionParser(description="Ganeti node daemon",
835                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
836                         version="%%prog (ganeti) %s" %
837                         constants.RELEASE_VERSION)
838   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
839   dirs.append((constants.LOG_OS_DIR, 0750))
840   dirs.append((constants.LOCK_DIR, 1777))
841   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
842
843
844 if __name__ == '__main__':
845   main()