Accept shutdown timeout from the user
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import SocketServer
30 import logging
31 import signal
32
33 from optparse import OptionParser
34
35 from ganeti import backend
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import errors
39 from ganeti import jstore
40 from ganeti import daemon
41 from ganeti import http
42 from ganeti import utils
43 from ganeti import storage
44
45 import ganeti.http.server
46
47
48 queue_lock = None
49
50
51 def _RequireJobQueueLock(fn):
52   """Decorator for job queue manipulating functions.
53
54   """
55   QUEUE_LOCK_TIMEOUT = 10
56
57   def wrapper(*args, **kwargs):
58     # Locking in exclusive, blocking mode because there could be several
59     # children running at the same time. Waiting up to 10 seconds.
60     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61     try:
62       return fn(*args, **kwargs)
63     finally:
64       queue_lock.Unlock()
65
66   return wrapper
67
68
69 class NodeHttpServer(http.server.HttpServer):
70   """The server implementation.
71
72   This class holds all methods exposed over the RPC interface.
73
74   """
75   def __init__(self, *args, **kwargs):
76     http.server.HttpServer.__init__(self, *args, **kwargs)
77     self.noded_pid = os.getpid()
78
79   def HandleRequest(self, req):
80     """Handle a request.
81
82     """
83     if req.request_method.upper() != http.HTTP_PUT:
84       raise http.HttpBadRequest()
85
86     path = req.request_path
87     if path.startswith("/"):
88       path = path[1:]
89
90     method = getattr(self, "perspective_%s" % path, None)
91     if method is None:
92       raise http.HttpNotFound()
93
94     try:
95       rvalue = method(req.request_body)
96       return True, rvalue
97
98     except backend.RPCFail, err:
99       # our custom failure exception; str(err) works fine if the
100       # exception was constructed with a single argument, and in
101       # this case, err.message == err.args[0] == str(err)
102       return (False, str(err))
103     except errors.QuitGanetiException, err:
104       # Tell parent to quit
105       logging.info("Shutting down the node daemon, arguments: %s",
106                    str(err.args))
107       os.kill(self.noded_pid, signal.SIGTERM)
108       # And return the error's arguments, which must be already in
109       # correct tuple format
110       return err.args
111     except Exception, err:
112       logging.exception("Error in RPC call")
113       return False, "Error while executing backend function: %s" % str(err)
114
115   # the new block devices  --------------------------
116
117   @staticmethod
118   def perspective_blockdev_create(params):
119     """Create a block device.
120
121     """
122     bdev_s, size, owner, on_primary, info = params
123     bdev = objects.Disk.FromDict(bdev_s)
124     if bdev is None:
125       raise ValueError("can't unserialize data!")
126     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
127
128   @staticmethod
129   def perspective_blockdev_remove(params):
130     """Remove a block device.
131
132     """
133     bdev_s = params[0]
134     bdev = objects.Disk.FromDict(bdev_s)
135     return backend.BlockdevRemove(bdev)
136
137   @staticmethod
138   def perspective_blockdev_rename(params):
139     """Remove a block device.
140
141     """
142     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
143     return backend.BlockdevRename(devlist)
144
145   @staticmethod
146   def perspective_blockdev_assemble(params):
147     """Assemble a block device.
148
149     """
150     bdev_s, owner, on_primary = params
151     bdev = objects.Disk.FromDict(bdev_s)
152     if bdev is None:
153       raise ValueError("can't unserialize data!")
154     return backend.BlockdevAssemble(bdev, owner, on_primary)
155
156   @staticmethod
157   def perspective_blockdev_shutdown(params):
158     """Shutdown a block device.
159
160     """
161     bdev_s = params[0]
162     bdev = objects.Disk.FromDict(bdev_s)
163     if bdev is None:
164       raise ValueError("can't unserialize data!")
165     return backend.BlockdevShutdown(bdev)
166
167   @staticmethod
168   def perspective_blockdev_addchildren(params):
169     """Add a child to a mirror device.
170
171     Note: this is only valid for mirror devices. It's the caller's duty
172     to send a correct disk, otherwise we raise an error.
173
174     """
175     bdev_s, ndev_s = params
176     bdev = objects.Disk.FromDict(bdev_s)
177     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
178     if bdev is None or ndevs.count(None) > 0:
179       raise ValueError("can't unserialize data!")
180     return backend.BlockdevAddchildren(bdev, ndevs)
181
182   @staticmethod
183   def perspective_blockdev_removechildren(params):
184     """Remove a child from a mirror device.
185
186     This is only valid for mirror devices, of course. It's the callers
187     duty to send a correct disk, otherwise we raise an error.
188
189     """
190     bdev_s, ndev_s = params
191     bdev = objects.Disk.FromDict(bdev_s)
192     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
193     if bdev is None or ndevs.count(None) > 0:
194       raise ValueError("can't unserialize data!")
195     return backend.BlockdevRemovechildren(bdev, ndevs)
196
197   @staticmethod
198   def perspective_blockdev_getmirrorstatus(params):
199     """Return the mirror status for a list of disks.
200
201     """
202     disks = [objects.Disk.FromDict(dsk_s)
203              for dsk_s in params]
204     return [status.ToDict()
205             for status in backend.BlockdevGetmirrorstatus(disks)]
206
207   @staticmethod
208   def perspective_blockdev_find(params):
209     """Expose the FindBlockDevice functionality for a disk.
210
211     This will try to find but not activate a disk.
212
213     """
214     disk = objects.Disk.FromDict(params[0])
215
216     result = backend.BlockdevFind(disk)
217     if result is None:
218       return None
219
220     return result.ToDict()
221
222   @staticmethod
223   def perspective_blockdev_snapshot(params):
224     """Create a snapshot device.
225
226     Note that this is only valid for LVM disks, if we get passed
227     something else we raise an exception. The snapshot device can be
228     remove by calling the generic block device remove call.
229
230     """
231     cfbd = objects.Disk.FromDict(params[0])
232     return backend.BlockdevSnapshot(cfbd)
233
234   @staticmethod
235   def perspective_blockdev_grow(params):
236     """Grow a stack of devices.
237
238     """
239     cfbd = objects.Disk.FromDict(params[0])
240     amount = params[1]
241     return backend.BlockdevGrow(cfbd, amount)
242
243   @staticmethod
244   def perspective_blockdev_close(params):
245     """Closes the given block devices.
246
247     """
248     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
249     return backend.BlockdevClose(params[0], disks)
250
251   @staticmethod
252   def perspective_blockdev_getsize(params):
253     """Compute the sizes of the given block devices.
254
255     """
256     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
257     return backend.BlockdevGetsize(disks)
258
259   @staticmethod
260   def perspective_blockdev_export(params):
261     """Compute the sizes of the given block devices.
262
263     """
264     disk = objects.Disk.FromDict(params[0])
265     dest_node, dest_path, cluster_name = params[1:]
266     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
267
268   # blockdev/drbd specific methods ----------
269
270   @staticmethod
271   def perspective_drbd_disconnect_net(params):
272     """Disconnects the network connection of drbd disks.
273
274     Note that this is only valid for drbd disks, so the members of the
275     disk list must all be drbd devices.
276
277     """
278     nodes_ip, disks = params
279     disks = [objects.Disk.FromDict(cf) for cf in disks]
280     return backend.DrbdDisconnectNet(nodes_ip, disks)
281
282   @staticmethod
283   def perspective_drbd_attach_net(params):
284     """Attaches the network connection of drbd disks.
285
286     Note that this is only valid for drbd disks, so the members of the
287     disk list must all be drbd devices.
288
289     """
290     nodes_ip, disks, instance_name, multimaster = params
291     disks = [objects.Disk.FromDict(cf) for cf in disks]
292     return backend.DrbdAttachNet(nodes_ip, disks,
293                                      instance_name, multimaster)
294
295   @staticmethod
296   def perspective_drbd_wait_sync(params):
297     """Wait until DRBD disks are synched.
298
299     Note that this is only valid for drbd disks, so the members of the
300     disk list must all be drbd devices.
301
302     """
303     nodes_ip, disks = params
304     disks = [objects.Disk.FromDict(cf) for cf in disks]
305     return backend.DrbdWaitSync(nodes_ip, disks)
306
307   # export/import  --------------------------
308
309   @staticmethod
310   def perspective_snapshot_export(params):
311     """Export a given snapshot.
312
313     """
314     disk = objects.Disk.FromDict(params[0])
315     dest_node = params[1]
316     instance = objects.Instance.FromDict(params[2])
317     cluster_name = params[3]
318     dev_idx = params[4]
319     return backend.ExportSnapshot(disk, dest_node, instance,
320                                   cluster_name, dev_idx)
321
322   @staticmethod
323   def perspective_finalize_export(params):
324     """Expose the finalize export functionality.
325
326     """
327     instance = objects.Instance.FromDict(params[0])
328     snap_disks = [objects.Disk.FromDict(str_data)
329                   for str_data in params[1]]
330     return backend.FinalizeExport(instance, snap_disks)
331
332   @staticmethod
333   def perspective_export_info(params):
334     """Query information about an existing export on this node.
335
336     The given path may not contain an export, in which case we return
337     None.
338
339     """
340     path = params[0]
341     return backend.ExportInfo(path)
342
343   @staticmethod
344   def perspective_export_list(params):
345     """List the available exports on this node.
346
347     Note that as opposed to export_info, which may query data about an
348     export in any path, this only queries the standard Ganeti path
349     (constants.EXPORT_DIR).
350
351     """
352     return backend.ListExports()
353
354   @staticmethod
355   def perspective_export_remove(params):
356     """Remove an export.
357
358     """
359     export = params[0]
360     return backend.RemoveExport(export)
361
362   # volume  --------------------------
363
364   @staticmethod
365   def perspective_lv_list(params):
366     """Query the list of logical volumes in a given volume group.
367
368     """
369     vgname = params[0]
370     return backend.GetVolumeList(vgname)
371
372   @staticmethod
373   def perspective_vg_list(params):
374     """Query the list of volume groups.
375
376     """
377     return backend.ListVolumeGroups()
378
379   # Storage --------------------------
380
381   @staticmethod
382   def perspective_storage_list(params):
383     """Get list of storage units.
384
385     """
386     (su_name, su_args, name, fields) = params
387     return storage.GetStorage(su_name, *su_args).List(name, fields)
388
389   @staticmethod
390   def perspective_storage_modify(params):
391     """Modify a storage unit.
392
393     """
394     (su_name, su_args, name, changes) = params
395     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
396
397   @staticmethod
398   def perspective_storage_execute(params):
399     """Execute an operation on a storage unit.
400
401     """
402     (su_name, su_args, name, op) = params
403     return storage.GetStorage(su_name, *su_args).Execute(name, op)
404
405   # bridge  --------------------------
406
407   @staticmethod
408   def perspective_bridges_exist(params):
409     """Check if all bridges given exist on this node.
410
411     """
412     bridges_list = params[0]
413     return backend.BridgesExist(bridges_list)
414
415   # instance  --------------------------
416
417   @staticmethod
418   def perspective_instance_os_add(params):
419     """Install an OS on a given instance.
420
421     """
422     inst_s = params[0]
423     inst = objects.Instance.FromDict(inst_s)
424     reinstall = params[1]
425     return backend.InstanceOsAdd(inst, reinstall)
426
427   @staticmethod
428   def perspective_instance_run_rename(params):
429     """Runs the OS rename script for an instance.
430
431     """
432     inst_s, old_name = params
433     inst = objects.Instance.FromDict(inst_s)
434     return backend.RunRenameInstance(inst, old_name)
435
436   @staticmethod
437   def perspective_instance_os_import(params):
438     """Run the import function of an OS onto a given instance.
439
440     """
441     inst_s, src_node, src_images, cluster_name = params
442     inst = objects.Instance.FromDict(inst_s)
443     return backend.ImportOSIntoInstance(inst, src_node, src_images,
444                                         cluster_name)
445
446   @staticmethod
447   def perspective_instance_shutdown(params):
448     """Shutdown an instance.
449
450     """
451     instance = objects.Instance.FromDict(params[0])
452     timeout = params[1]
453     return backend.InstanceShutdown(instance, timeout)
454
455   @staticmethod
456   def perspective_instance_start(params):
457     """Start an instance.
458
459     """
460     instance = objects.Instance.FromDict(params[0])
461     return backend.StartInstance(instance)
462
463   @staticmethod
464   def perspective_migration_info(params):
465     """Gather information about an instance to be migrated.
466
467     """
468     instance = objects.Instance.FromDict(params[0])
469     return backend.MigrationInfo(instance)
470
471   @staticmethod
472   def perspective_accept_instance(params):
473     """Prepare the node to accept an instance.
474
475     """
476     instance, info, target = params
477     instance = objects.Instance.FromDict(instance)
478     return backend.AcceptInstance(instance, info, target)
479
480   @staticmethod
481   def perspective_finalize_migration(params):
482     """Finalize the instance migration.
483
484     """
485     instance, info, success = params
486     instance = objects.Instance.FromDict(instance)
487     return backend.FinalizeMigration(instance, info, success)
488
489   @staticmethod
490   def perspective_instance_migrate(params):
491     """Migrates an instance.
492
493     """
494     instance, target, live = params
495     instance = objects.Instance.FromDict(instance)
496     return backend.MigrateInstance(instance, target, live)
497
498   @staticmethod
499   def perspective_instance_reboot(params):
500     """Reboot an instance.
501
502     """
503     instance = objects.Instance.FromDict(params[0])
504     reboot_type = params[1]
505     return backend.InstanceReboot(instance, reboot_type)
506
507   @staticmethod
508   def perspective_instance_info(params):
509     """Query instance information.
510
511     """
512     return backend.GetInstanceInfo(params[0], params[1])
513
514   @staticmethod
515   def perspective_instance_migratable(params):
516     """Query whether the specified instance can be migrated.
517
518     """
519     instance = objects.Instance.FromDict(params[0])
520     return backend.GetInstanceMigratable(instance)
521
522   @staticmethod
523   def perspective_all_instances_info(params):
524     """Query information about all instances.
525
526     """
527     return backend.GetAllInstancesInfo(params[0])
528
529   @staticmethod
530   def perspective_instance_list(params):
531     """Query the list of running instances.
532
533     """
534     return backend.GetInstanceList(params[0])
535
536   # node --------------------------
537
538   @staticmethod
539   def perspective_node_tcp_ping(params):
540     """Do a TcpPing on the remote node.
541
542     """
543     return utils.TcpPing(params[1], params[2], timeout=params[3],
544                          live_port_needed=params[4], source=params[0])
545
546   @staticmethod
547   def perspective_node_has_ip_address(params):
548     """Checks if a node has the given ip address.
549
550     """
551     return utils.OwnIpAddress(params[0])
552
553   @staticmethod
554   def perspective_node_info(params):
555     """Query node information.
556
557     """
558     vgname, hypervisor_type = params
559     return backend.GetNodeInfo(vgname, hypervisor_type)
560
561   @staticmethod
562   def perspective_node_add(params):
563     """Complete the registration of this node in the cluster.
564
565     """
566     return backend.AddNode(params[0], params[1], params[2],
567                            params[3], params[4], params[5])
568
569   @staticmethod
570   def perspective_node_verify(params):
571     """Run a verify sequence on this node.
572
573     """
574     return backend.VerifyNode(params[0], params[1])
575
576   @staticmethod
577   def perspective_node_start_master(params):
578     """Promote this node to master status.
579
580     """
581     return backend.StartMaster(params[0], params[1])
582
583   @staticmethod
584   def perspective_node_stop_master(params):
585     """Demote this node from master status.
586
587     """
588     return backend.StopMaster(params[0])
589
590   @staticmethod
591   def perspective_node_leave_cluster(params):
592     """Cleanup after leaving a cluster.
593
594     """
595     return backend.LeaveCluster()
596
597   @staticmethod
598   def perspective_node_volumes(params):
599     """Query the list of all logical volume groups.
600
601     """
602     return backend.NodeVolumes()
603
604   @staticmethod
605   def perspective_node_demote_from_mc(params):
606     """Demote a node from the master candidate role.
607
608     """
609     return backend.DemoteFromMC()
610
611
612   @staticmethod
613   def perspective_node_powercycle(params):
614     """Tries to powercycle the nod.
615
616     """
617     hypervisor_type = params[0]
618     return backend.PowercycleNode(hypervisor_type)
619
620
621   # cluster --------------------------
622
623   @staticmethod
624   def perspective_version(params):
625     """Query version information.
626
627     """
628     return constants.PROTOCOL_VERSION
629
630   @staticmethod
631   def perspective_upload_file(params):
632     """Upload a file.
633
634     Note that the backend implementation imposes strict rules on which
635     files are accepted.
636
637     """
638     return backend.UploadFile(*params)
639
640   @staticmethod
641   def perspective_master_info(params):
642     """Query master information.
643
644     """
645     return backend.GetMasterInfo()
646
647   @staticmethod
648   def perspective_write_ssconf_files(params):
649     """Write ssconf files.
650
651     """
652     (values,) = params
653     return backend.WriteSsconfFiles(values)
654
655   # os -----------------------
656
657   @staticmethod
658   def perspective_os_diagnose(params):
659     """Query detailed information about existing OSes.
660
661     """
662     return backend.DiagnoseOS()
663
664   @staticmethod
665   def perspective_os_get(params):
666     """Query information about a given OS.
667
668     """
669     name = params[0]
670     os_obj = backend.OSFromDisk(name)
671     return os_obj.ToDict()
672
673   # hooks -----------------------
674
675   @staticmethod
676   def perspective_hooks_runner(params):
677     """Run hook scripts.
678
679     """
680     hpath, phase, env = params
681     hr = backend.HooksRunner()
682     return hr.RunHooks(hpath, phase, env)
683
684   # iallocator -----------------
685
686   @staticmethod
687   def perspective_iallocator_runner(params):
688     """Run an iallocator script.
689
690     """
691     name, idata = params
692     iar = backend.IAllocatorRunner()
693     return iar.Run(name, idata)
694
695   # test -----------------------
696
697   @staticmethod
698   def perspective_test_delay(params):
699     """Run test delay.
700
701     """
702     duration = params[0]
703     status, rval = utils.TestDelay(duration)
704     if not status:
705       raise backend.RPCFail(rval)
706     return rval
707
708   # file storage ---------------
709
710   @staticmethod
711   def perspective_file_storage_dir_create(params):
712     """Create the file storage directory.
713
714     """
715     file_storage_dir = params[0]
716     return backend.CreateFileStorageDir(file_storage_dir)
717
718   @staticmethod
719   def perspective_file_storage_dir_remove(params):
720     """Remove the file storage directory.
721
722     """
723     file_storage_dir = params[0]
724     return backend.RemoveFileStorageDir(file_storage_dir)
725
726   @staticmethod
727   def perspective_file_storage_dir_rename(params):
728     """Rename the file storage directory.
729
730     """
731     old_file_storage_dir = params[0]
732     new_file_storage_dir = params[1]
733     return backend.RenameFileStorageDir(old_file_storage_dir,
734                                         new_file_storage_dir)
735
736   # jobs ------------------------
737
738   @staticmethod
739   @_RequireJobQueueLock
740   def perspective_jobqueue_update(params):
741     """Update job queue.
742
743     """
744     (file_name, content) = params
745     return backend.JobQueueUpdate(file_name, content)
746
747   @staticmethod
748   @_RequireJobQueueLock
749   def perspective_jobqueue_purge(params):
750     """Purge job queue.
751
752     """
753     return backend.JobQueuePurge()
754
755   @staticmethod
756   @_RequireJobQueueLock
757   def perspective_jobqueue_rename(params):
758     """Rename a job queue file.
759
760     """
761     # TODO: What if a file fails to rename?
762     return [backend.JobQueueRename(old, new) for old, new in params]
763
764   @staticmethod
765   def perspective_jobqueue_set_drain(params):
766     """Set/unset the queue drain flag.
767
768     """
769     drain_flag = params[0]
770     return backend.JobQueueSetDrainFlag(drain_flag)
771
772
773   # hypervisor ---------------
774
775   @staticmethod
776   def perspective_hypervisor_validate_params(params):
777     """Validate the hypervisor parameters.
778
779     """
780     (hvname, hvparams) = params
781     return backend.ValidateHVParams(hvname, hvparams)
782
783
784 def ExecNoded(options, args):
785   """Main node daemon function, executed with the PID file held.
786
787   """
788   global queue_lock
789
790   # Read SSL certificate
791   if options.ssl:
792     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
793                                     ssl_cert_path=options.ssl_cert)
794   else:
795     ssl_params = None
796
797   # Prepare job queue
798   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
799
800   mainloop = daemon.Mainloop()
801   server = NodeHttpServer(mainloop, options.bind_address, options.port,
802                           ssl_params=ssl_params, ssl_verify_peer=True)
803   server.Start()
804   try:
805     mainloop.Run()
806   finally:
807     server.Stop()
808
809
810 def main():
811   """Main function for the node daemon.
812
813   """
814   parser = OptionParser(description="Ganeti node daemon",
815                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
816                         version="%%prog (ganeti) %s" %
817                         constants.RELEASE_VERSION)
818   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
819   dirs.append((constants.LOG_OS_DIR, 0750))
820   dirs.append((constants.LOCK_DIR, 1777))
821   daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNoded)
822
823
824 if __name__ == '__main__':
825   main()