Further pylint disables, mostly for Unused args
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import SocketServer
35 import logging
36 import signal
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49
50 import ganeti.http.server
51
52
53 queue_lock = None
54
55
56 def _RequireJobQueueLock(fn):
57   """Decorator for job queue manipulating functions.
58
59   """
60   QUEUE_LOCK_TIMEOUT = 10
61
62   def wrapper(*args, **kwargs):
63     # Locking in exclusive, blocking mode because there could be several
64     # children running at the same time. Waiting up to 10 seconds.
65     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
66     try:
67       return fn(*args, **kwargs)
68     finally:
69       queue_lock.Unlock()
70
71   return wrapper
72
73
74 class NodeHttpServer(http.server.HttpServer):
75   """The server implementation.
76
77   This class holds all methods exposed over the RPC interface.
78
79   """
80   # too many public methods, and unused args - all methods get params
81   # due to the API
82   # pylint: disable-msg=R0904,W0613
83   def __init__(self, *args, **kwargs):
84     http.server.HttpServer.__init__(self, *args, **kwargs)
85     self.noded_pid = os.getpid()
86
87   def HandleRequest(self, req):
88     """Handle a request.
89
90     """
91     if req.request_method.upper() != http.HTTP_PUT:
92       raise http.HttpBadRequest()
93
94     path = req.request_path
95     if path.startswith("/"):
96       path = path[1:]
97
98     method = getattr(self, "perspective_%s" % path, None)
99     if method is None:
100       raise http.HttpNotFound()
101
102     try:
103       rvalue = method(req.request_body)
104       return True, rvalue
105
106     except backend.RPCFail, err:
107       # our custom failure exception; str(err) works fine if the
108       # exception was constructed with a single argument, and in
109       # this case, err.message == err.args[0] == str(err)
110       return (False, str(err))
111     except errors.QuitGanetiException, err:
112       # Tell parent to quit
113       logging.info("Shutting down the node daemon, arguments: %s",
114                    str(err.args))
115       os.kill(self.noded_pid, signal.SIGTERM)
116       # And return the error's arguments, which must be already in
117       # correct tuple format
118       return err.args
119     except Exception, err:
120       logging.exception("Error in RPC call")
121       return False, "Error while executing backend function: %s" % str(err)
122
123   # the new block devices  --------------------------
124
125   @staticmethod
126   def perspective_blockdev_create(params):
127     """Create a block device.
128
129     """
130     bdev_s, size, owner, on_primary, info = params
131     bdev = objects.Disk.FromDict(bdev_s)
132     if bdev is None:
133       raise ValueError("can't unserialize data!")
134     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
135
136   @staticmethod
137   def perspective_blockdev_remove(params):
138     """Remove a block device.
139
140     """
141     bdev_s = params[0]
142     bdev = objects.Disk.FromDict(bdev_s)
143     return backend.BlockdevRemove(bdev)
144
145   @staticmethod
146   def perspective_blockdev_rename(params):
147     """Remove a block device.
148
149     """
150     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
151     return backend.BlockdevRename(devlist)
152
153   @staticmethod
154   def perspective_blockdev_assemble(params):
155     """Assemble a block device.
156
157     """
158     bdev_s, owner, on_primary = params
159     bdev = objects.Disk.FromDict(bdev_s)
160     if bdev is None:
161       raise ValueError("can't unserialize data!")
162     return backend.BlockdevAssemble(bdev, owner, on_primary)
163
164   @staticmethod
165   def perspective_blockdev_shutdown(params):
166     """Shutdown a block device.
167
168     """
169     bdev_s = params[0]
170     bdev = objects.Disk.FromDict(bdev_s)
171     if bdev is None:
172       raise ValueError("can't unserialize data!")
173     return backend.BlockdevShutdown(bdev)
174
175   @staticmethod
176   def perspective_blockdev_addchildren(params):
177     """Add a child to a mirror device.
178
179     Note: this is only valid for mirror devices. It's the caller's duty
180     to send a correct disk, otherwise we raise an error.
181
182     """
183     bdev_s, ndev_s = params
184     bdev = objects.Disk.FromDict(bdev_s)
185     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
186     if bdev is None or ndevs.count(None) > 0:
187       raise ValueError("can't unserialize data!")
188     return backend.BlockdevAddchildren(bdev, ndevs)
189
190   @staticmethod
191   def perspective_blockdev_removechildren(params):
192     """Remove a child from a mirror device.
193
194     This is only valid for mirror devices, of course. It's the callers
195     duty to send a correct disk, otherwise we raise an error.
196
197     """
198     bdev_s, ndev_s = params
199     bdev = objects.Disk.FromDict(bdev_s)
200     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
201     if bdev is None or ndevs.count(None) > 0:
202       raise ValueError("can't unserialize data!")
203     return backend.BlockdevRemovechildren(bdev, ndevs)
204
205   @staticmethod
206   def perspective_blockdev_getmirrorstatus(params):
207     """Return the mirror status for a list of disks.
208
209     """
210     disks = [objects.Disk.FromDict(dsk_s)
211              for dsk_s in params]
212     return [status.ToDict()
213             for status in backend.BlockdevGetmirrorstatus(disks)]
214
215   @staticmethod
216   def perspective_blockdev_find(params):
217     """Expose the FindBlockDevice functionality for a disk.
218
219     This will try to find but not activate a disk.
220
221     """
222     disk = objects.Disk.FromDict(params[0])
223
224     result = backend.BlockdevFind(disk)
225     if result is None:
226       return None
227
228     return result.ToDict()
229
230   @staticmethod
231   def perspective_blockdev_snapshot(params):
232     """Create a snapshot device.
233
234     Note that this is only valid for LVM disks, if we get passed
235     something else we raise an exception. The snapshot device can be
236     remove by calling the generic block device remove call.
237
238     """
239     cfbd = objects.Disk.FromDict(params[0])
240     return backend.BlockdevSnapshot(cfbd)
241
242   @staticmethod
243   def perspective_blockdev_grow(params):
244     """Grow a stack of devices.
245
246     """
247     cfbd = objects.Disk.FromDict(params[0])
248     amount = params[1]
249     return backend.BlockdevGrow(cfbd, amount)
250
251   @staticmethod
252   def perspective_blockdev_close(params):
253     """Closes the given block devices.
254
255     """
256     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
257     return backend.BlockdevClose(params[0], disks)
258
259   @staticmethod
260   def perspective_blockdev_getsize(params):
261     """Compute the sizes of the given block devices.
262
263     """
264     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
265     return backend.BlockdevGetsize(disks)
266
267   @staticmethod
268   def perspective_blockdev_export(params):
269     """Compute the sizes of the given block devices.
270
271     """
272     disk = objects.Disk.FromDict(params[0])
273     dest_node, dest_path, cluster_name = params[1:]
274     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
275
276   # blockdev/drbd specific methods ----------
277
278   @staticmethod
279   def perspective_drbd_disconnect_net(params):
280     """Disconnects the network connection of drbd disks.
281
282     Note that this is only valid for drbd disks, so the members of the
283     disk list must all be drbd devices.
284
285     """
286     nodes_ip, disks = params
287     disks = [objects.Disk.FromDict(cf) for cf in disks]
288     return backend.DrbdDisconnectNet(nodes_ip, disks)
289
290   @staticmethod
291   def perspective_drbd_attach_net(params):
292     """Attaches the network connection of drbd disks.
293
294     Note that this is only valid for drbd disks, so the members of the
295     disk list must all be drbd devices.
296
297     """
298     nodes_ip, disks, instance_name, multimaster = params
299     disks = [objects.Disk.FromDict(cf) for cf in disks]
300     return backend.DrbdAttachNet(nodes_ip, disks,
301                                      instance_name, multimaster)
302
303   @staticmethod
304   def perspective_drbd_wait_sync(params):
305     """Wait until DRBD disks are synched.
306
307     Note that this is only valid for drbd disks, so the members of the
308     disk list must all be drbd devices.
309
310     """
311     nodes_ip, disks = params
312     disks = [objects.Disk.FromDict(cf) for cf in disks]
313     return backend.DrbdWaitSync(nodes_ip, disks)
314
315   # export/import  --------------------------
316
317   @staticmethod
318   def perspective_snapshot_export(params):
319     """Export a given snapshot.
320
321     """
322     disk = objects.Disk.FromDict(params[0])
323     dest_node = params[1]
324     instance = objects.Instance.FromDict(params[2])
325     cluster_name = params[3]
326     dev_idx = params[4]
327     return backend.ExportSnapshot(disk, dest_node, instance,
328                                   cluster_name, dev_idx)
329
330   @staticmethod
331   def perspective_finalize_export(params):
332     """Expose the finalize export functionality.
333
334     """
335     instance = objects.Instance.FromDict(params[0])
336     snap_disks = [objects.Disk.FromDict(str_data)
337                   for str_data in params[1]]
338     return backend.FinalizeExport(instance, snap_disks)
339
340   @staticmethod
341   def perspective_export_info(params):
342     """Query information about an existing export on this node.
343
344     The given path may not contain an export, in which case we return
345     None.
346
347     """
348     path = params[0]
349     return backend.ExportInfo(path)
350
351   @staticmethod
352   def perspective_export_list(params):
353     """List the available exports on this node.
354
355     Note that as opposed to export_info, which may query data about an
356     export in any path, this only queries the standard Ganeti path
357     (constants.EXPORT_DIR).
358
359     """
360     return backend.ListExports()
361
362   @staticmethod
363   def perspective_export_remove(params):
364     """Remove an export.
365
366     """
367     export = params[0]
368     return backend.RemoveExport(export)
369
370   # volume  --------------------------
371
372   @staticmethod
373   def perspective_lv_list(params):
374     """Query the list of logical volumes in a given volume group.
375
376     """
377     vgname = params[0]
378     return backend.GetVolumeList(vgname)
379
380   @staticmethod
381   def perspective_vg_list(params):
382     """Query the list of volume groups.
383
384     """
385     return backend.ListVolumeGroups()
386
387   # Storage --------------------------
388
389   @staticmethod
390   def perspective_storage_list(params):
391     """Get list of storage units.
392
393     """
394     (su_name, su_args, name, fields) = params
395     return storage.GetStorage(su_name, *su_args).List(name, fields)
396
397   @staticmethod
398   def perspective_storage_modify(params):
399     """Modify a storage unit.
400
401     """
402     (su_name, su_args, name, changes) = params
403     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
404
405   @staticmethod
406   def perspective_storage_execute(params):
407     """Execute an operation on a storage unit.
408
409     """
410     (su_name, su_args, name, op) = params
411     return storage.GetStorage(su_name, *su_args).Execute(name, op)
412
413   # bridge  --------------------------
414
415   @staticmethod
416   def perspective_bridges_exist(params):
417     """Check if all bridges given exist on this node.
418
419     """
420     bridges_list = params[0]
421     return backend.BridgesExist(bridges_list)
422
423   # instance  --------------------------
424
425   @staticmethod
426   def perspective_instance_os_add(params):
427     """Install an OS on a given instance.
428
429     """
430     inst_s = params[0]
431     inst = objects.Instance.FromDict(inst_s)
432     reinstall = params[1]
433     return backend.InstanceOsAdd(inst, reinstall)
434
435   @staticmethod
436   def perspective_instance_run_rename(params):
437     """Runs the OS rename script for an instance.
438
439     """
440     inst_s, old_name = params
441     inst = objects.Instance.FromDict(inst_s)
442     return backend.RunRenameInstance(inst, old_name)
443
444   @staticmethod
445   def perspective_instance_os_import(params):
446     """Run the import function of an OS onto a given instance.
447
448     """
449     inst_s, src_node, src_images, cluster_name = params
450     inst = objects.Instance.FromDict(inst_s)
451     return backend.ImportOSIntoInstance(inst, src_node, src_images,
452                                         cluster_name)
453
454   @staticmethod
455   def perspective_instance_shutdown(params):
456     """Shutdown an instance.
457
458     """
459     instance = objects.Instance.FromDict(params[0])
460     timeout = params[1]
461     return backend.InstanceShutdown(instance, timeout)
462
463   @staticmethod
464   def perspective_instance_start(params):
465     """Start an instance.
466
467     """
468     instance = objects.Instance.FromDict(params[0])
469     return backend.StartInstance(instance)
470
471   @staticmethod
472   def perspective_migration_info(params):
473     """Gather information about an instance to be migrated.
474
475     """
476     instance = objects.Instance.FromDict(params[0])
477     return backend.MigrationInfo(instance)
478
479   @staticmethod
480   def perspective_accept_instance(params):
481     """Prepare the node to accept an instance.
482
483     """
484     instance, info, target = params
485     instance = objects.Instance.FromDict(instance)
486     return backend.AcceptInstance(instance, info, target)
487
488   @staticmethod
489   def perspective_finalize_migration(params):
490     """Finalize the instance migration.
491
492     """
493     instance, info, success = params
494     instance = objects.Instance.FromDict(instance)
495     return backend.FinalizeMigration(instance, info, success)
496
497   @staticmethod
498   def perspective_instance_migrate(params):
499     """Migrates an instance.
500
501     """
502     instance, target, live = params
503     instance = objects.Instance.FromDict(instance)
504     return backend.MigrateInstance(instance, target, live)
505
506   @staticmethod
507   def perspective_instance_reboot(params):
508     """Reboot an instance.
509
510     """
511     instance = objects.Instance.FromDict(params[0])
512     reboot_type = params[1]
513     shutdown_timeout = params[2]
514     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
515
516   @staticmethod
517   def perspective_instance_info(params):
518     """Query instance information.
519
520     """
521     return backend.GetInstanceInfo(params[0], params[1])
522
523   @staticmethod
524   def perspective_instance_migratable(params):
525     """Query whether the specified instance can be migrated.
526
527     """
528     instance = objects.Instance.FromDict(params[0])
529     return backend.GetInstanceMigratable(instance)
530
531   @staticmethod
532   def perspective_all_instances_info(params):
533     """Query information about all instances.
534
535     """
536     return backend.GetAllInstancesInfo(params[0])
537
538   @staticmethod
539   def perspective_instance_list(params):
540     """Query the list of running instances.
541
542     """
543     return backend.GetInstanceList(params[0])
544
545   # node --------------------------
546
547   @staticmethod
548   def perspective_node_tcp_ping(params):
549     """Do a TcpPing on the remote node.
550
551     """
552     return utils.TcpPing(params[1], params[2], timeout=params[3],
553                          live_port_needed=params[4], source=params[0])
554
555   @staticmethod
556   def perspective_node_has_ip_address(params):
557     """Checks if a node has the given ip address.
558
559     """
560     return utils.OwnIpAddress(params[0])
561
562   @staticmethod
563   def perspective_node_info(params):
564     """Query node information.
565
566     """
567     vgname, hypervisor_type = params
568     return backend.GetNodeInfo(vgname, hypervisor_type)
569
570   @staticmethod
571   def perspective_node_add(params):
572     """Complete the registration of this node in the cluster.
573
574     """
575     return backend.AddNode(params[0], params[1], params[2],
576                            params[3], params[4], params[5])
577
578   @staticmethod
579   def perspective_node_verify(params):
580     """Run a verify sequence on this node.
581
582     """
583     return backend.VerifyNode(params[0], params[1])
584
585   @staticmethod
586   def perspective_node_start_master(params):
587     """Promote this node to master status.
588
589     """
590     return backend.StartMaster(params[0], params[1])
591
592   @staticmethod
593   def perspective_node_stop_master(params):
594     """Demote this node from master status.
595
596     """
597     return backend.StopMaster(params[0])
598
599   @staticmethod
600   def perspective_node_leave_cluster(params):
601     """Cleanup after leaving a cluster.
602
603     """
604     return backend.LeaveCluster(params[0])
605
606   @staticmethod
607   def perspective_node_volumes(params):
608     """Query the list of all logical volume groups.
609
610     """
611     return backend.NodeVolumes()
612
613   @staticmethod
614   def perspective_node_demote_from_mc(params):
615     """Demote a node from the master candidate role.
616
617     """
618     return backend.DemoteFromMC()
619
620
621   @staticmethod
622   def perspective_node_powercycle(params):
623     """Tries to powercycle the nod.
624
625     """
626     hypervisor_type = params[0]
627     return backend.PowercycleNode(hypervisor_type)
628
629
630   # cluster --------------------------
631
632   @staticmethod
633   def perspective_version(params):
634     """Query version information.
635
636     """
637     return constants.PROTOCOL_VERSION
638
639   @staticmethod
640   def perspective_upload_file(params):
641     """Upload a file.
642
643     Note that the backend implementation imposes strict rules on which
644     files are accepted.
645
646     """
647     return backend.UploadFile(*params)
648
649   @staticmethod
650   def perspective_master_info(params):
651     """Query master information.
652
653     """
654     return backend.GetMasterInfo()
655
656   @staticmethod
657   def perspective_write_ssconf_files(params):
658     """Write ssconf files.
659
660     """
661     (values,) = params
662     return backend.WriteSsconfFiles(values)
663
664   # os -----------------------
665
666   @staticmethod
667   def perspective_os_diagnose(params):
668     """Query detailed information about existing OSes.
669
670     """
671     return backend.DiagnoseOS()
672
673   @staticmethod
674   def perspective_os_get(params):
675     """Query information about a given OS.
676
677     """
678     name = params[0]
679     os_obj = backend.OSFromDisk(name)
680     return os_obj.ToDict()
681
682   # hooks -----------------------
683
684   @staticmethod
685   def perspective_hooks_runner(params):
686     """Run hook scripts.
687
688     """
689     hpath, phase, env = params
690     hr = backend.HooksRunner()
691     return hr.RunHooks(hpath, phase, env)
692
693   # iallocator -----------------
694
695   @staticmethod
696   def perspective_iallocator_runner(params):
697     """Run an iallocator script.
698
699     """
700     name, idata = params
701     iar = backend.IAllocatorRunner()
702     return iar.Run(name, idata)
703
704   # test -----------------------
705
706   @staticmethod
707   def perspective_test_delay(params):
708     """Run test delay.
709
710     """
711     duration = params[0]
712     status, rval = utils.TestDelay(duration)
713     if not status:
714       raise backend.RPCFail(rval)
715     return rval
716
717   # file storage ---------------
718
719   @staticmethod
720   def perspective_file_storage_dir_create(params):
721     """Create the file storage directory.
722
723     """
724     file_storage_dir = params[0]
725     return backend.CreateFileStorageDir(file_storage_dir)
726
727   @staticmethod
728   def perspective_file_storage_dir_remove(params):
729     """Remove the file storage directory.
730
731     """
732     file_storage_dir = params[0]
733     return backend.RemoveFileStorageDir(file_storage_dir)
734
735   @staticmethod
736   def perspective_file_storage_dir_rename(params):
737     """Rename the file storage directory.
738
739     """
740     old_file_storage_dir = params[0]
741     new_file_storage_dir = params[1]
742     return backend.RenameFileStorageDir(old_file_storage_dir,
743                                         new_file_storage_dir)
744
745   # jobs ------------------------
746
747   @staticmethod
748   @_RequireJobQueueLock
749   def perspective_jobqueue_update(params):
750     """Update job queue.
751
752     """
753     (file_name, content) = params
754     return backend.JobQueueUpdate(file_name, content)
755
756   @staticmethod
757   @_RequireJobQueueLock
758   def perspective_jobqueue_purge(params):
759     """Purge job queue.
760
761     """
762     return backend.JobQueuePurge()
763
764   @staticmethod
765   @_RequireJobQueueLock
766   def perspective_jobqueue_rename(params):
767     """Rename a job queue file.
768
769     """
770     # TODO: What if a file fails to rename?
771     return [backend.JobQueueRename(old, new) for old, new in params]
772
773   @staticmethod
774   def perspective_jobqueue_set_drain(params):
775     """Set/unset the queue drain flag.
776
777     """
778     drain_flag = params[0]
779     return backend.JobQueueSetDrainFlag(drain_flag)
780
781
782   # hypervisor ---------------
783
784   @staticmethod
785   def perspective_hypervisor_validate_params(params):
786     """Validate the hypervisor parameters.
787
788     """
789     (hvname, hvparams) = params
790     return backend.ValidateHVParams(hvname, hvparams)
791
792
793 def CheckNoded(_, args):
794   """Initial checks whether to run or exit with a failure.
795
796   """
797   if args: # noded doesn't take any arguments
798     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
799                           sys.argv[0])
800     sys.exit(constants.EXIT_FAILURE)
801
802
803 def ExecNoded(options, _):
804   """Main node daemon function, executed with the PID file held.
805
806   """
807   global queue_lock # pylint: disable-msg=W0603
808
809   # Read SSL certificate
810   if options.ssl:
811     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
812                                     ssl_cert_path=options.ssl_cert)
813   else:
814     ssl_params = None
815
816   # Prepare job queue
817   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
818
819   mainloop = daemon.Mainloop()
820   server = NodeHttpServer(mainloop, options.bind_address, options.port,
821                           ssl_params=ssl_params, ssl_verify_peer=True)
822   server.Start()
823   try:
824     mainloop.Run()
825   finally:
826     server.Stop()
827
828
829 def main():
830   """Main function for the node daemon.
831
832   """
833   parser = OptionParser(description="Ganeti node daemon",
834                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
835                         version="%%prog (ganeti) %s" %
836                         constants.RELEASE_VERSION)
837   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
838   dirs.append((constants.LOG_OS_DIR, 0750))
839   dirs.append((constants.LOCK_DIR, 1777))
840   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
841
842
843 if __name__ == '__main__':
844   main()