Merge remote branch 'devel-2.1'
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48
49 import ganeti.http.server # pylint: disable-msg=W0611
50
51
52 queue_lock = None
53
54
55 def _RequireJobQueueLock(fn):
56   """Decorator for job queue manipulating functions.
57
58   """
59   QUEUE_LOCK_TIMEOUT = 10
60
61   def wrapper(*args, **kwargs):
62     # Locking in exclusive, blocking mode because there could be several
63     # children running at the same time. Waiting up to 10 seconds.
64     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
65     try:
66       return fn(*args, **kwargs)
67     finally:
68       queue_lock.Unlock()
69
70   return wrapper
71
72
73 class NodeHttpServer(http.server.HttpServer):
74   """The server implementation.
75
76   This class holds all methods exposed over the RPC interface.
77
78   """
79   # too many public methods, and unused args - all methods get params
80   # due to the API
81   # pylint: disable-msg=R0904,W0613
82   def __init__(self, *args, **kwargs):
83     http.server.HttpServer.__init__(self, *args, **kwargs)
84     self.noded_pid = os.getpid()
85
86   def HandleRequest(self, req):
87     """Handle a request.
88
89     """
90     if req.request_method.upper() != http.HTTP_PUT:
91       raise http.HttpBadRequest()
92
93     path = req.request_path
94     if path.startswith("/"):
95       path = path[1:]
96
97     method = getattr(self, "perspective_%s" % path, None)
98     if method is None:
99       raise http.HttpNotFound()
100
101     try:
102       rvalue = method(req.request_body)
103       return True, rvalue
104
105     except backend.RPCFail, err:
106       # our custom failure exception; str(err) works fine if the
107       # exception was constructed with a single argument, and in
108       # this case, err.message == err.args[0] == str(err)
109       return (False, str(err))
110     except errors.QuitGanetiException, err:
111       # Tell parent to quit
112       logging.info("Shutting down the node daemon, arguments: %s",
113                    str(err.args))
114       os.kill(self.noded_pid, signal.SIGTERM)
115       # And return the error's arguments, which must be already in
116       # correct tuple format
117       return err.args
118     except Exception, err:
119       logging.exception("Error in RPC call")
120       return False, "Error while executing backend function: %s" % str(err)
121
122   # the new block devices  --------------------------
123
124   @staticmethod
125   def perspective_blockdev_create(params):
126     """Create a block device.
127
128     """
129     bdev_s, size, owner, on_primary, info = params
130     bdev = objects.Disk.FromDict(bdev_s)
131     if bdev is None:
132       raise ValueError("can't unserialize data!")
133     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
134
135   @staticmethod
136   def perspective_blockdev_remove(params):
137     """Remove a block device.
138
139     """
140     bdev_s = params[0]
141     bdev = objects.Disk.FromDict(bdev_s)
142     return backend.BlockdevRemove(bdev)
143
144   @staticmethod
145   def perspective_blockdev_rename(params):
146     """Remove a block device.
147
148     """
149     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
150     return backend.BlockdevRename(devlist)
151
152   @staticmethod
153   def perspective_blockdev_assemble(params):
154     """Assemble a block device.
155
156     """
157     bdev_s, owner, on_primary = params
158     bdev = objects.Disk.FromDict(bdev_s)
159     if bdev is None:
160       raise ValueError("can't unserialize data!")
161     return backend.BlockdevAssemble(bdev, owner, on_primary)
162
163   @staticmethod
164   def perspective_blockdev_shutdown(params):
165     """Shutdown a block device.
166
167     """
168     bdev_s = params[0]
169     bdev = objects.Disk.FromDict(bdev_s)
170     if bdev is None:
171       raise ValueError("can't unserialize data!")
172     return backend.BlockdevShutdown(bdev)
173
174   @staticmethod
175   def perspective_blockdev_addchildren(params):
176     """Add a child to a mirror device.
177
178     Note: this is only valid for mirror devices. It's the caller's duty
179     to send a correct disk, otherwise we raise an error.
180
181     """
182     bdev_s, ndev_s = params
183     bdev = objects.Disk.FromDict(bdev_s)
184     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
185     if bdev is None or ndevs.count(None) > 0:
186       raise ValueError("can't unserialize data!")
187     return backend.BlockdevAddchildren(bdev, ndevs)
188
189   @staticmethod
190   def perspective_blockdev_removechildren(params):
191     """Remove a child from a mirror device.
192
193     This is only valid for mirror devices, of course. It's the callers
194     duty to send a correct disk, otherwise we raise an error.
195
196     """
197     bdev_s, ndev_s = params
198     bdev = objects.Disk.FromDict(bdev_s)
199     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
200     if bdev is None or ndevs.count(None) > 0:
201       raise ValueError("can't unserialize data!")
202     return backend.BlockdevRemovechildren(bdev, ndevs)
203
204   @staticmethod
205   def perspective_blockdev_getmirrorstatus(params):
206     """Return the mirror status for a list of disks.
207
208     """
209     disks = [objects.Disk.FromDict(dsk_s)
210              for dsk_s in params]
211     return [status.ToDict()
212             for status in backend.BlockdevGetmirrorstatus(disks)]
213
214   @staticmethod
215   def perspective_blockdev_find(params):
216     """Expose the FindBlockDevice functionality for a disk.
217
218     This will try to find but not activate a disk.
219
220     """
221     disk = objects.Disk.FromDict(params[0])
222
223     result = backend.BlockdevFind(disk)
224     if result is None:
225       return None
226
227     return result.ToDict()
228
229   @staticmethod
230   def perspective_blockdev_snapshot(params):
231     """Create a snapshot device.
232
233     Note that this is only valid for LVM disks, if we get passed
234     something else we raise an exception. The snapshot device can be
235     remove by calling the generic block device remove call.
236
237     """
238     cfbd = objects.Disk.FromDict(params[0])
239     return backend.BlockdevSnapshot(cfbd)
240
241   @staticmethod
242   def perspective_blockdev_grow(params):
243     """Grow a stack of devices.
244
245     """
246     cfbd = objects.Disk.FromDict(params[0])
247     amount = params[1]
248     return backend.BlockdevGrow(cfbd, amount)
249
250   @staticmethod
251   def perspective_blockdev_close(params):
252     """Closes the given block devices.
253
254     """
255     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
256     return backend.BlockdevClose(params[0], disks)
257
258   @staticmethod
259   def perspective_blockdev_getsize(params):
260     """Compute the sizes of the given block devices.
261
262     """
263     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
264     return backend.BlockdevGetsize(disks)
265
266   @staticmethod
267   def perspective_blockdev_export(params):
268     """Compute the sizes of the given block devices.
269
270     """
271     disk = objects.Disk.FromDict(params[0])
272     dest_node, dest_path, cluster_name = params[1:]
273     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
274
275   # blockdev/drbd specific methods ----------
276
277   @staticmethod
278   def perspective_drbd_disconnect_net(params):
279     """Disconnects the network connection of drbd disks.
280
281     Note that this is only valid for drbd disks, so the members of the
282     disk list must all be drbd devices.
283
284     """
285     nodes_ip, disks = params
286     disks = [objects.Disk.FromDict(cf) for cf in disks]
287     return backend.DrbdDisconnectNet(nodes_ip, disks)
288
289   @staticmethod
290   def perspective_drbd_attach_net(params):
291     """Attaches the network connection of drbd disks.
292
293     Note that this is only valid for drbd disks, so the members of the
294     disk list must all be drbd devices.
295
296     """
297     nodes_ip, disks, instance_name, multimaster = params
298     disks = [objects.Disk.FromDict(cf) for cf in disks]
299     return backend.DrbdAttachNet(nodes_ip, disks,
300                                      instance_name, multimaster)
301
302   @staticmethod
303   def perspective_drbd_wait_sync(params):
304     """Wait until DRBD disks are synched.
305
306     Note that this is only valid for drbd disks, so the members of the
307     disk list must all be drbd devices.
308
309     """
310     nodes_ip, disks = params
311     disks = [objects.Disk.FromDict(cf) for cf in disks]
312     return backend.DrbdWaitSync(nodes_ip, disks)
313
314   # export/import  --------------------------
315
316   @staticmethod
317   def perspective_snapshot_export(params):
318     """Export a given snapshot.
319
320     """
321     disk = objects.Disk.FromDict(params[0])
322     dest_node = params[1]
323     instance = objects.Instance.FromDict(params[2])
324     cluster_name = params[3]
325     dev_idx = params[4]
326     return backend.ExportSnapshot(disk, dest_node, instance,
327                                   cluster_name, dev_idx)
328
329   @staticmethod
330   def perspective_finalize_export(params):
331     """Expose the finalize export functionality.
332
333     """
334     instance = objects.Instance.FromDict(params[0])
335     snap_disks = [objects.Disk.FromDict(str_data)
336                   for str_data in params[1]]
337     return backend.FinalizeExport(instance, snap_disks)
338
339   @staticmethod
340   def perspective_export_info(params):
341     """Query information about an existing export on this node.
342
343     The given path may not contain an export, in which case we return
344     None.
345
346     """
347     path = params[0]
348     return backend.ExportInfo(path)
349
350   @staticmethod
351   def perspective_export_list(params):
352     """List the available exports on this node.
353
354     Note that as opposed to export_info, which may query data about an
355     export in any path, this only queries the standard Ganeti path
356     (constants.EXPORT_DIR).
357
358     """
359     return backend.ListExports()
360
361   @staticmethod
362   def perspective_export_remove(params):
363     """Remove an export.
364
365     """
366     export = params[0]
367     return backend.RemoveExport(export)
368
369   # volume  --------------------------
370
371   @staticmethod
372   def perspective_lv_list(params):
373     """Query the list of logical volumes in a given volume group.
374
375     """
376     vgname = params[0]
377     return backend.GetVolumeList(vgname)
378
379   @staticmethod
380   def perspective_vg_list(params):
381     """Query the list of volume groups.
382
383     """
384     return backend.ListVolumeGroups()
385
386   # Storage --------------------------
387
388   @staticmethod
389   def perspective_storage_list(params):
390     """Get list of storage units.
391
392     """
393     (su_name, su_args, name, fields) = params
394     return storage.GetStorage(su_name, *su_args).List(name, fields)
395
396   @staticmethod
397   def perspective_storage_modify(params):
398     """Modify a storage unit.
399
400     """
401     (su_name, su_args, name, changes) = params
402     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
403
404   @staticmethod
405   def perspective_storage_execute(params):
406     """Execute an operation on a storage unit.
407
408     """
409     (su_name, su_args, name, op) = params
410     return storage.GetStorage(su_name, *su_args).Execute(name, op)
411
412   # bridge  --------------------------
413
414   @staticmethod
415   def perspective_bridges_exist(params):
416     """Check if all bridges given exist on this node.
417
418     """
419     bridges_list = params[0]
420     return backend.BridgesExist(bridges_list)
421
422   # instance  --------------------------
423
424   @staticmethod
425   def perspective_instance_os_add(params):
426     """Install an OS on a given instance.
427
428     """
429     inst_s = params[0]
430     inst = objects.Instance.FromDict(inst_s)
431     reinstall = params[1]
432     return backend.InstanceOsAdd(inst, reinstall)
433
434   @staticmethod
435   def perspective_instance_run_rename(params):
436     """Runs the OS rename script for an instance.
437
438     """
439     inst_s, old_name = params
440     inst = objects.Instance.FromDict(inst_s)
441     return backend.RunRenameInstance(inst, old_name)
442
443   @staticmethod
444   def perspective_instance_os_import(params):
445     """Run the import function of an OS onto a given instance.
446
447     """
448     inst_s, src_node, src_images, cluster_name = params
449     inst = objects.Instance.FromDict(inst_s)
450     return backend.ImportOSIntoInstance(inst, src_node, src_images,
451                                         cluster_name)
452
453   @staticmethod
454   def perspective_instance_shutdown(params):
455     """Shutdown an instance.
456
457     """
458     instance = objects.Instance.FromDict(params[0])
459     timeout = params[1]
460     return backend.InstanceShutdown(instance, timeout)
461
462   @staticmethod
463   def perspective_instance_start(params):
464     """Start an instance.
465
466     """
467     instance = objects.Instance.FromDict(params[0])
468     return backend.StartInstance(instance)
469
470   @staticmethod
471   def perspective_migration_info(params):
472     """Gather information about an instance to be migrated.
473
474     """
475     instance = objects.Instance.FromDict(params[0])
476     return backend.MigrationInfo(instance)
477
478   @staticmethod
479   def perspective_accept_instance(params):
480     """Prepare the node to accept an instance.
481
482     """
483     instance, info, target = params
484     instance = objects.Instance.FromDict(instance)
485     return backend.AcceptInstance(instance, info, target)
486
487   @staticmethod
488   def perspective_finalize_migration(params):
489     """Finalize the instance migration.
490
491     """
492     instance, info, success = params
493     instance = objects.Instance.FromDict(instance)
494     return backend.FinalizeMigration(instance, info, success)
495
496   @staticmethod
497   def perspective_instance_migrate(params):
498     """Migrates an instance.
499
500     """
501     instance, target, live = params
502     instance = objects.Instance.FromDict(instance)
503     return backend.MigrateInstance(instance, target, live)
504
505   @staticmethod
506   def perspective_instance_reboot(params):
507     """Reboot an instance.
508
509     """
510     instance = objects.Instance.FromDict(params[0])
511     reboot_type = params[1]
512     shutdown_timeout = params[2]
513     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
514
515   @staticmethod
516   def perspective_instance_info(params):
517     """Query instance information.
518
519     """
520     return backend.GetInstanceInfo(params[0], params[1])
521
522   @staticmethod
523   def perspective_instance_migratable(params):
524     """Query whether the specified instance can be migrated.
525
526     """
527     instance = objects.Instance.FromDict(params[0])
528     return backend.GetInstanceMigratable(instance)
529
530   @staticmethod
531   def perspective_all_instances_info(params):
532     """Query information about all instances.
533
534     """
535     return backend.GetAllInstancesInfo(params[0])
536
537   @staticmethod
538   def perspective_instance_list(params):
539     """Query the list of running instances.
540
541     """
542     return backend.GetInstanceList(params[0])
543
544   # node --------------------------
545
546   @staticmethod
547   def perspective_node_tcp_ping(params):
548     """Do a TcpPing on the remote node.
549
550     """
551     return utils.TcpPing(params[1], params[2], timeout=params[3],
552                          live_port_needed=params[4], source=params[0])
553
554   @staticmethod
555   def perspective_node_has_ip_address(params):
556     """Checks if a node has the given ip address.
557
558     """
559     return utils.OwnIpAddress(params[0])
560
561   @staticmethod
562   def perspective_node_info(params):
563     """Query node information.
564
565     """
566     vgname, hypervisor_type = params
567     return backend.GetNodeInfo(vgname, hypervisor_type)
568
569   @staticmethod
570   def perspective_node_add(params):
571     """Complete the registration of this node in the cluster.
572
573     """
574     return backend.AddNode(params[0], params[1], params[2],
575                            params[3], params[4], params[5])
576
577   @staticmethod
578   def perspective_node_verify(params):
579     """Run a verify sequence on this node.
580
581     """
582     return backend.VerifyNode(params[0], params[1])
583
584   @staticmethod
585   def perspective_node_start_master(params):
586     """Promote this node to master status.
587
588     """
589     return backend.StartMaster(params[0], params[1])
590
591   @staticmethod
592   def perspective_node_stop_master(params):
593     """Demote this node from master status.
594
595     """
596     return backend.StopMaster(params[0])
597
598   @staticmethod
599   def perspective_node_leave_cluster(params):
600     """Cleanup after leaving a cluster.
601
602     """
603     return backend.LeaveCluster(params[0])
604
605   @staticmethod
606   def perspective_node_volumes(params):
607     """Query the list of all logical volume groups.
608
609     """
610     return backend.NodeVolumes()
611
612   @staticmethod
613   def perspective_node_demote_from_mc(params):
614     """Demote a node from the master candidate role.
615
616     """
617     return backend.DemoteFromMC()
618
619
620   @staticmethod
621   def perspective_node_powercycle(params):
622     """Tries to powercycle the nod.
623
624     """
625     hypervisor_type = params[0]
626     return backend.PowercycleNode(hypervisor_type)
627
628
629   # cluster --------------------------
630
631   @staticmethod
632   def perspective_version(params):
633     """Query version information.
634
635     """
636     return constants.PROTOCOL_VERSION
637
638   @staticmethod
639   def perspective_upload_file(params):
640     """Upload a file.
641
642     Note that the backend implementation imposes strict rules on which
643     files are accepted.
644
645     """
646     return backend.UploadFile(*params)
647
648   @staticmethod
649   def perspective_master_info(params):
650     """Query master information.
651
652     """
653     return backend.GetMasterInfo()
654
655   @staticmethod
656   def perspective_write_ssconf_files(params):
657     """Write ssconf files.
658
659     """
660     (values,) = params
661     return backend.WriteSsconfFiles(values)
662
663   # os -----------------------
664
665   @staticmethod
666   def perspective_os_diagnose(params):
667     """Query detailed information about existing OSes.
668
669     """
670     return backend.DiagnoseOS()
671
672   @staticmethod
673   def perspective_os_get(params):
674     """Query information about a given OS.
675
676     """
677     name = params[0]
678     os_obj = backend.OSFromDisk(name)
679     return os_obj.ToDict()
680
681   # hooks -----------------------
682
683   @staticmethod
684   def perspective_hooks_runner(params):
685     """Run hook scripts.
686
687     """
688     hpath, phase, env = params
689     hr = backend.HooksRunner()
690     return hr.RunHooks(hpath, phase, env)
691
692   # iallocator -----------------
693
694   @staticmethod
695   def perspective_iallocator_runner(params):
696     """Run an iallocator script.
697
698     """
699     name, idata = params
700     iar = backend.IAllocatorRunner()
701     return iar.Run(name, idata)
702
703   # test -----------------------
704
705   @staticmethod
706   def perspective_test_delay(params):
707     """Run test delay.
708
709     """
710     duration = params[0]
711     status, rval = utils.TestDelay(duration)
712     if not status:
713       raise backend.RPCFail(rval)
714     return rval
715
716   # file storage ---------------
717
718   @staticmethod
719   def perspective_file_storage_dir_create(params):
720     """Create the file storage directory.
721
722     """
723     file_storage_dir = params[0]
724     return backend.CreateFileStorageDir(file_storage_dir)
725
726   @staticmethod
727   def perspective_file_storage_dir_remove(params):
728     """Remove the file storage directory.
729
730     """
731     file_storage_dir = params[0]
732     return backend.RemoveFileStorageDir(file_storage_dir)
733
734   @staticmethod
735   def perspective_file_storage_dir_rename(params):
736     """Rename the file storage directory.
737
738     """
739     old_file_storage_dir = params[0]
740     new_file_storage_dir = params[1]
741     return backend.RenameFileStorageDir(old_file_storage_dir,
742                                         new_file_storage_dir)
743
744   # jobs ------------------------
745
746   @staticmethod
747   @_RequireJobQueueLock
748   def perspective_jobqueue_update(params):
749     """Update job queue.
750
751     """
752     (file_name, content) = params
753     return backend.JobQueueUpdate(file_name, content)
754
755   @staticmethod
756   @_RequireJobQueueLock
757   def perspective_jobqueue_purge(params):
758     """Purge job queue.
759
760     """
761     return backend.JobQueuePurge()
762
763   @staticmethod
764   @_RequireJobQueueLock
765   def perspective_jobqueue_rename(params):
766     """Rename a job queue file.
767
768     """
769     # TODO: What if a file fails to rename?
770     return [backend.JobQueueRename(old, new) for old, new in params]
771
772   @staticmethod
773   def perspective_jobqueue_set_drain(params):
774     """Set/unset the queue drain flag.
775
776     """
777     drain_flag = params[0]
778     return backend.JobQueueSetDrainFlag(drain_flag)
779
780
781   # hypervisor ---------------
782
783   @staticmethod
784   def perspective_hypervisor_validate_params(params):
785     """Validate the hypervisor parameters.
786
787     """
788     (hvname, hvparams) = params
789     return backend.ValidateHVParams(hvname, hvparams)
790
791
792 def CheckNoded(_, args):
793   """Initial checks whether to run or exit with a failure.
794
795   """
796   if args: # noded doesn't take any arguments
797     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
798                           sys.argv[0])
799     sys.exit(constants.EXIT_FAILURE)
800
801
802 def ExecNoded(options, _):
803   """Main node daemon function, executed with the PID file held.
804
805   """
806   global queue_lock # pylint: disable-msg=W0603
807
808   # Read SSL certificate
809   if options.ssl:
810     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
811                                     ssl_cert_path=options.ssl_cert)
812   else:
813     ssl_params = None
814
815   # Prepare job queue
816   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
817
818   mainloop = daemon.Mainloop()
819   server = NodeHttpServer(mainloop, options.bind_address, options.port,
820                           ssl_params=ssl_params, ssl_verify_peer=True)
821   server.Start()
822   try:
823     mainloop.Run()
824   finally:
825     server.Stop()
826
827
828 def main():
829   """Main function for the node daemon.
830
831   """
832   parser = OptionParser(description="Ganeti node daemon",
833                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
834                         version="%%prog (ganeti) %s" %
835                         constants.RELEASE_VERSION)
836   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
837   dirs.append((constants.LOG_OS_DIR, 0750))
838   dirs.append((constants.LOCK_DIR, 1777))
839   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
840                      default_ssl_cert=constants.SSL_CERT_FILE,
841                      default_ssl_key=constants.SSL_CERT_FILE)
842
843
844 if __name__ == '__main__':
845   main()