Merge remote branch 'origin/stable-2.1' into devel-2.1
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48
49 import ganeti.http.server # pylint: disable-msg=W0611
50
51
52 queue_lock = None
53
54
55 def _PrepareQueueLock():
56   """Try to prepare the queue lock.
57
58   @return: None for success, otherwise an exception object
59
60   """
61   global queue_lock # pylint: disable-msg=W0603
62
63   if queue_lock is not None:
64     return None
65
66   # Prepare job queue
67   try:
68     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
69     return None
70   except EnvironmentError, err:
71     return err
72
73
74 def _RequireJobQueueLock(fn):
75   """Decorator for job queue manipulating functions.
76
77   """
78   QUEUE_LOCK_TIMEOUT = 10
79
80   def wrapper(*args, **kwargs):
81     # Locking in exclusive, blocking mode because there could be several
82     # children running at the same time. Waiting up to 10 seconds.
83     if _PrepareQueueLock() is not None:
84       raise errors.JobQueueError("Job queue failed initialization,"
85                                  " cannot update jobs")
86     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
87     try:
88       return fn(*args, **kwargs)
89     finally:
90       queue_lock.Unlock()
91
92   return wrapper
93
94
95 class NodeHttpServer(http.server.HttpServer):
96   """The server implementation.
97
98   This class holds all methods exposed over the RPC interface.
99
100   """
101   # too many public methods, and unused args - all methods get params
102   # due to the API
103   # pylint: disable-msg=R0904,W0613
104   def __init__(self, *args, **kwargs):
105     http.server.HttpServer.__init__(self, *args, **kwargs)
106     self.noded_pid = os.getpid()
107
108   def HandleRequest(self, req):
109     """Handle a request.
110
111     """
112     if req.request_method.upper() != http.HTTP_PUT:
113       raise http.HttpBadRequest()
114
115     path = req.request_path
116     if path.startswith("/"):
117       path = path[1:]
118
119     method = getattr(self, "perspective_%s" % path, None)
120     if method is None:
121       raise http.HttpNotFound()
122
123     try:
124       rvalue = method(req.request_body)
125       return True, rvalue
126
127     except backend.RPCFail, err:
128       # our custom failure exception; str(err) works fine if the
129       # exception was constructed with a single argument, and in
130       # this case, err.message == err.args[0] == str(err)
131       return (False, str(err))
132     except errors.QuitGanetiException, err:
133       # Tell parent to quit
134       logging.info("Shutting down the node daemon, arguments: %s",
135                    str(err.args))
136       os.kill(self.noded_pid, signal.SIGTERM)
137       # And return the error's arguments, which must be already in
138       # correct tuple format
139       return err.args
140     except Exception, err:
141       logging.exception("Error in RPC call")
142       return False, "Error while executing backend function: %s" % str(err)
143
144   # the new block devices  --------------------------
145
146   @staticmethod
147   def perspective_blockdev_create(params):
148     """Create a block device.
149
150     """
151     bdev_s, size, owner, on_primary, info = params
152     bdev = objects.Disk.FromDict(bdev_s)
153     if bdev is None:
154       raise ValueError("can't unserialize data!")
155     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
156
157   @staticmethod
158   def perspective_blockdev_remove(params):
159     """Remove a block device.
160
161     """
162     bdev_s = params[0]
163     bdev = objects.Disk.FromDict(bdev_s)
164     return backend.BlockdevRemove(bdev)
165
166   @staticmethod
167   def perspective_blockdev_rename(params):
168     """Remove a block device.
169
170     """
171     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
172     return backend.BlockdevRename(devlist)
173
174   @staticmethod
175   def perspective_blockdev_assemble(params):
176     """Assemble a block device.
177
178     """
179     bdev_s, owner, on_primary = params
180     bdev = objects.Disk.FromDict(bdev_s)
181     if bdev is None:
182       raise ValueError("can't unserialize data!")
183     return backend.BlockdevAssemble(bdev, owner, on_primary)
184
185   @staticmethod
186   def perspective_blockdev_shutdown(params):
187     """Shutdown a block device.
188
189     """
190     bdev_s = params[0]
191     bdev = objects.Disk.FromDict(bdev_s)
192     if bdev is None:
193       raise ValueError("can't unserialize data!")
194     return backend.BlockdevShutdown(bdev)
195
196   @staticmethod
197   def perspective_blockdev_addchildren(params):
198     """Add a child to a mirror device.
199
200     Note: this is only valid for mirror devices. It's the caller's duty
201     to send a correct disk, otherwise we raise an error.
202
203     """
204     bdev_s, ndev_s = params
205     bdev = objects.Disk.FromDict(bdev_s)
206     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
207     if bdev is None or ndevs.count(None) > 0:
208       raise ValueError("can't unserialize data!")
209     return backend.BlockdevAddchildren(bdev, ndevs)
210
211   @staticmethod
212   def perspective_blockdev_removechildren(params):
213     """Remove a child from a mirror device.
214
215     This is only valid for mirror devices, of course. It's the callers
216     duty to send a correct disk, otherwise we raise an error.
217
218     """
219     bdev_s, ndev_s = params
220     bdev = objects.Disk.FromDict(bdev_s)
221     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
222     if bdev is None or ndevs.count(None) > 0:
223       raise ValueError("can't unserialize data!")
224     return backend.BlockdevRemovechildren(bdev, ndevs)
225
226   @staticmethod
227   def perspective_blockdev_getmirrorstatus(params):
228     """Return the mirror status for a list of disks.
229
230     """
231     disks = [objects.Disk.FromDict(dsk_s)
232              for dsk_s in params]
233     return [status.ToDict()
234             for status in backend.BlockdevGetmirrorstatus(disks)]
235
236   @staticmethod
237   def perspective_blockdev_find(params):
238     """Expose the FindBlockDevice functionality for a disk.
239
240     This will try to find but not activate a disk.
241
242     """
243     disk = objects.Disk.FromDict(params[0])
244
245     result = backend.BlockdevFind(disk)
246     if result is None:
247       return None
248
249     return result.ToDict()
250
251   @staticmethod
252   def perspective_blockdev_snapshot(params):
253     """Create a snapshot device.
254
255     Note that this is only valid for LVM disks, if we get passed
256     something else we raise an exception. The snapshot device can be
257     remove by calling the generic block device remove call.
258
259     """
260     cfbd = objects.Disk.FromDict(params[0])
261     return backend.BlockdevSnapshot(cfbd)
262
263   @staticmethod
264   def perspective_blockdev_grow(params):
265     """Grow a stack of devices.
266
267     """
268     cfbd = objects.Disk.FromDict(params[0])
269     amount = params[1]
270     return backend.BlockdevGrow(cfbd, amount)
271
272   @staticmethod
273   def perspective_blockdev_close(params):
274     """Closes the given block devices.
275
276     """
277     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
278     return backend.BlockdevClose(params[0], disks)
279
280   @staticmethod
281   def perspective_blockdev_getsize(params):
282     """Compute the sizes of the given block devices.
283
284     """
285     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
286     return backend.BlockdevGetsize(disks)
287
288   @staticmethod
289   def perspective_blockdev_export(params):
290     """Compute the sizes of the given block devices.
291
292     """
293     disk = objects.Disk.FromDict(params[0])
294     dest_node, dest_path, cluster_name = params[1:]
295     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
296
297   # blockdev/drbd specific methods ----------
298
299   @staticmethod
300   def perspective_drbd_disconnect_net(params):
301     """Disconnects the network connection of drbd disks.
302
303     Note that this is only valid for drbd disks, so the members of the
304     disk list must all be drbd devices.
305
306     """
307     nodes_ip, disks = params
308     disks = [objects.Disk.FromDict(cf) for cf in disks]
309     return backend.DrbdDisconnectNet(nodes_ip, disks)
310
311   @staticmethod
312   def perspective_drbd_attach_net(params):
313     """Attaches the network connection of drbd disks.
314
315     Note that this is only valid for drbd disks, so the members of the
316     disk list must all be drbd devices.
317
318     """
319     nodes_ip, disks, instance_name, multimaster = params
320     disks = [objects.Disk.FromDict(cf) for cf in disks]
321     return backend.DrbdAttachNet(nodes_ip, disks,
322                                      instance_name, multimaster)
323
324   @staticmethod
325   def perspective_drbd_wait_sync(params):
326     """Wait until DRBD disks are synched.
327
328     Note that this is only valid for drbd disks, so the members of the
329     disk list must all be drbd devices.
330
331     """
332     nodes_ip, disks = params
333     disks = [objects.Disk.FromDict(cf) for cf in disks]
334     return backend.DrbdWaitSync(nodes_ip, disks)
335
336   # export/import  --------------------------
337
338   @staticmethod
339   def perspective_snapshot_export(params):
340     """Export a given snapshot.
341
342     """
343     disk = objects.Disk.FromDict(params[0])
344     dest_node = params[1]
345     instance = objects.Instance.FromDict(params[2])
346     cluster_name = params[3]
347     dev_idx = params[4]
348     return backend.ExportSnapshot(disk, dest_node, instance,
349                                   cluster_name, dev_idx)
350
351   @staticmethod
352   def perspective_finalize_export(params):
353     """Expose the finalize export functionality.
354
355     """
356     instance = objects.Instance.FromDict(params[0])
357     snap_disks = [objects.Disk.FromDict(str_data)
358                   for str_data in params[1]]
359     return backend.FinalizeExport(instance, snap_disks)
360
361   @staticmethod
362   def perspective_export_info(params):
363     """Query information about an existing export on this node.
364
365     The given path may not contain an export, in which case we return
366     None.
367
368     """
369     path = params[0]
370     return backend.ExportInfo(path)
371
372   @staticmethod
373   def perspective_export_list(params):
374     """List the available exports on this node.
375
376     Note that as opposed to export_info, which may query data about an
377     export in any path, this only queries the standard Ganeti path
378     (constants.EXPORT_DIR).
379
380     """
381     return backend.ListExports()
382
383   @staticmethod
384   def perspective_export_remove(params):
385     """Remove an export.
386
387     """
388     export = params[0]
389     return backend.RemoveExport(export)
390
391   # volume  --------------------------
392
393   @staticmethod
394   def perspective_lv_list(params):
395     """Query the list of logical volumes in a given volume group.
396
397     """
398     vgname = params[0]
399     return backend.GetVolumeList(vgname)
400
401   @staticmethod
402   def perspective_vg_list(params):
403     """Query the list of volume groups.
404
405     """
406     return backend.ListVolumeGroups()
407
408   # Storage --------------------------
409
410   @staticmethod
411   def perspective_storage_list(params):
412     """Get list of storage units.
413
414     """
415     (su_name, su_args, name, fields) = params
416     return storage.GetStorage(su_name, *su_args).List(name, fields)
417
418   @staticmethod
419   def perspective_storage_modify(params):
420     """Modify a storage unit.
421
422     """
423     (su_name, su_args, name, changes) = params
424     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
425
426   @staticmethod
427   def perspective_storage_execute(params):
428     """Execute an operation on a storage unit.
429
430     """
431     (su_name, su_args, name, op) = params
432     return storage.GetStorage(su_name, *su_args).Execute(name, op)
433
434   # bridge  --------------------------
435
436   @staticmethod
437   def perspective_bridges_exist(params):
438     """Check if all bridges given exist on this node.
439
440     """
441     bridges_list = params[0]
442     return backend.BridgesExist(bridges_list)
443
444   # instance  --------------------------
445
446   @staticmethod
447   def perspective_instance_os_add(params):
448     """Install an OS on a given instance.
449
450     """
451     inst_s = params[0]
452     inst = objects.Instance.FromDict(inst_s)
453     reinstall = params[1]
454     return backend.InstanceOsAdd(inst, reinstall)
455
456   @staticmethod
457   def perspective_instance_run_rename(params):
458     """Runs the OS rename script for an instance.
459
460     """
461     inst_s, old_name = params
462     inst = objects.Instance.FromDict(inst_s)
463     return backend.RunRenameInstance(inst, old_name)
464
465   @staticmethod
466   def perspective_instance_os_import(params):
467     """Run the import function of an OS onto a given instance.
468
469     """
470     inst_s, src_node, src_images, cluster_name = params
471     inst = objects.Instance.FromDict(inst_s)
472     return backend.ImportOSIntoInstance(inst, src_node, src_images,
473                                         cluster_name)
474
475   @staticmethod
476   def perspective_instance_shutdown(params):
477     """Shutdown an instance.
478
479     """
480     instance = objects.Instance.FromDict(params[0])
481     timeout = params[1]
482     return backend.InstanceShutdown(instance, timeout)
483
484   @staticmethod
485   def perspective_instance_start(params):
486     """Start an instance.
487
488     """
489     instance = objects.Instance.FromDict(params[0])
490     return backend.StartInstance(instance)
491
492   @staticmethod
493   def perspective_migration_info(params):
494     """Gather information about an instance to be migrated.
495
496     """
497     instance = objects.Instance.FromDict(params[0])
498     return backend.MigrationInfo(instance)
499
500   @staticmethod
501   def perspective_accept_instance(params):
502     """Prepare the node to accept an instance.
503
504     """
505     instance, info, target = params
506     instance = objects.Instance.FromDict(instance)
507     return backend.AcceptInstance(instance, info, target)
508
509   @staticmethod
510   def perspective_finalize_migration(params):
511     """Finalize the instance migration.
512
513     """
514     instance, info, success = params
515     instance = objects.Instance.FromDict(instance)
516     return backend.FinalizeMigration(instance, info, success)
517
518   @staticmethod
519   def perspective_instance_migrate(params):
520     """Migrates an instance.
521
522     """
523     instance, target, live = params
524     instance = objects.Instance.FromDict(instance)
525     return backend.MigrateInstance(instance, target, live)
526
527   @staticmethod
528   def perspective_instance_reboot(params):
529     """Reboot an instance.
530
531     """
532     instance = objects.Instance.FromDict(params[0])
533     reboot_type = params[1]
534     shutdown_timeout = params[2]
535     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
536
537   @staticmethod
538   def perspective_instance_info(params):
539     """Query instance information.
540
541     """
542     return backend.GetInstanceInfo(params[0], params[1])
543
544   @staticmethod
545   def perspective_instance_migratable(params):
546     """Query whether the specified instance can be migrated.
547
548     """
549     instance = objects.Instance.FromDict(params[0])
550     return backend.GetInstanceMigratable(instance)
551
552   @staticmethod
553   def perspective_all_instances_info(params):
554     """Query information about all instances.
555
556     """
557     return backend.GetAllInstancesInfo(params[0])
558
559   @staticmethod
560   def perspective_instance_list(params):
561     """Query the list of running instances.
562
563     """
564     return backend.GetInstanceList(params[0])
565
566   # node --------------------------
567
568   @staticmethod
569   def perspective_node_tcp_ping(params):
570     """Do a TcpPing on the remote node.
571
572     """
573     return utils.TcpPing(params[1], params[2], timeout=params[3],
574                          live_port_needed=params[4], source=params[0])
575
576   @staticmethod
577   def perspective_node_has_ip_address(params):
578     """Checks if a node has the given ip address.
579
580     """
581     return utils.OwnIpAddress(params[0])
582
583   @staticmethod
584   def perspective_node_info(params):
585     """Query node information.
586
587     """
588     vgname, hypervisor_type = params
589     return backend.GetNodeInfo(vgname, hypervisor_type)
590
591   @staticmethod
592   def perspective_node_add(params):
593     """Complete the registration of this node in the cluster.
594
595     """
596     return backend.AddNode(params[0], params[1], params[2],
597                            params[3], params[4], params[5])
598
599   @staticmethod
600   def perspective_node_verify(params):
601     """Run a verify sequence on this node.
602
603     """
604     return backend.VerifyNode(params[0], params[1])
605
606   @staticmethod
607   def perspective_node_start_master(params):
608     """Promote this node to master status.
609
610     """
611     return backend.StartMaster(params[0], params[1])
612
613   @staticmethod
614   def perspective_node_stop_master(params):
615     """Demote this node from master status.
616
617     """
618     return backend.StopMaster(params[0])
619
620   @staticmethod
621   def perspective_node_leave_cluster(params):
622     """Cleanup after leaving a cluster.
623
624     """
625     return backend.LeaveCluster(params[0])
626
627   @staticmethod
628   def perspective_node_volumes(params):
629     """Query the list of all logical volume groups.
630
631     """
632     return backend.NodeVolumes()
633
634   @staticmethod
635   def perspective_node_demote_from_mc(params):
636     """Demote a node from the master candidate role.
637
638     """
639     return backend.DemoteFromMC()
640
641
642   @staticmethod
643   def perspective_node_powercycle(params):
644     """Tries to powercycle the nod.
645
646     """
647     hypervisor_type = params[0]
648     return backend.PowercycleNode(hypervisor_type)
649
650
651   # cluster --------------------------
652
653   @staticmethod
654   def perspective_version(params):
655     """Query version information.
656
657     """
658     return constants.PROTOCOL_VERSION
659
660   @staticmethod
661   def perspective_upload_file(params):
662     """Upload a file.
663
664     Note that the backend implementation imposes strict rules on which
665     files are accepted.
666
667     """
668     return backend.UploadFile(*params)
669
670   @staticmethod
671   def perspective_master_info(params):
672     """Query master information.
673
674     """
675     return backend.GetMasterInfo()
676
677   @staticmethod
678   def perspective_write_ssconf_files(params):
679     """Write ssconf files.
680
681     """
682     (values,) = params
683     return backend.WriteSsconfFiles(values)
684
685   # os -----------------------
686
687   @staticmethod
688   def perspective_os_diagnose(params):
689     """Query detailed information about existing OSes.
690
691     """
692     return backend.DiagnoseOS()
693
694   @staticmethod
695   def perspective_os_get(params):
696     """Query information about a given OS.
697
698     """
699     name = params[0]
700     os_obj = backend.OSFromDisk(name)
701     return os_obj.ToDict()
702
703   # hooks -----------------------
704
705   @staticmethod
706   def perspective_hooks_runner(params):
707     """Run hook scripts.
708
709     """
710     hpath, phase, env = params
711     hr = backend.HooksRunner()
712     return hr.RunHooks(hpath, phase, env)
713
714   # iallocator -----------------
715
716   @staticmethod
717   def perspective_iallocator_runner(params):
718     """Run an iallocator script.
719
720     """
721     name, idata = params
722     iar = backend.IAllocatorRunner()
723     return iar.Run(name, idata)
724
725   # test -----------------------
726
727   @staticmethod
728   def perspective_test_delay(params):
729     """Run test delay.
730
731     """
732     duration = params[0]
733     status, rval = utils.TestDelay(duration)
734     if not status:
735       raise backend.RPCFail(rval)
736     return rval
737
738   # file storage ---------------
739
740   @staticmethod
741   def perspective_file_storage_dir_create(params):
742     """Create the file storage directory.
743
744     """
745     file_storage_dir = params[0]
746     return backend.CreateFileStorageDir(file_storage_dir)
747
748   @staticmethod
749   def perspective_file_storage_dir_remove(params):
750     """Remove the file storage directory.
751
752     """
753     file_storage_dir = params[0]
754     return backend.RemoveFileStorageDir(file_storage_dir)
755
756   @staticmethod
757   def perspective_file_storage_dir_rename(params):
758     """Rename the file storage directory.
759
760     """
761     old_file_storage_dir = params[0]
762     new_file_storage_dir = params[1]
763     return backend.RenameFileStorageDir(old_file_storage_dir,
764                                         new_file_storage_dir)
765
766   # jobs ------------------------
767
768   @staticmethod
769   @_RequireJobQueueLock
770   def perspective_jobqueue_update(params):
771     """Update job queue.
772
773     """
774     (file_name, content) = params
775     return backend.JobQueueUpdate(file_name, content)
776
777   @staticmethod
778   @_RequireJobQueueLock
779   def perspective_jobqueue_purge(params):
780     """Purge job queue.
781
782     """
783     return backend.JobQueuePurge()
784
785   @staticmethod
786   @_RequireJobQueueLock
787   def perspective_jobqueue_rename(params):
788     """Rename a job queue file.
789
790     """
791     # TODO: What if a file fails to rename?
792     return [backend.JobQueueRename(old, new) for old, new in params]
793
794   @staticmethod
795   def perspective_jobqueue_set_drain(params):
796     """Set/unset the queue drain flag.
797
798     """
799     drain_flag = params[0]
800     return backend.JobQueueSetDrainFlag(drain_flag)
801
802
803   # hypervisor ---------------
804
805   @staticmethod
806   def perspective_hypervisor_validate_params(params):
807     """Validate the hypervisor parameters.
808
809     """
810     (hvname, hvparams) = params
811     return backend.ValidateHVParams(hvname, hvparams)
812
813
814 def CheckNoded(_, args):
815   """Initial checks whether to run or exit with a failure.
816
817   """
818   if args: # noded doesn't take any arguments
819     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
820                           sys.argv[0])
821     sys.exit(constants.EXIT_FAILURE)
822
823
824 def ExecNoded(options, _):
825   """Main node daemon function, executed with the PID file held.
826
827   """
828   # Read SSL certificate
829   if options.ssl:
830     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
831                                     ssl_cert_path=options.ssl_cert)
832   else:
833     ssl_params = None
834
835   err = _PrepareQueueLock()
836   if err is not None:
837     # this might be some kind of file-system/permission error; while
838     # this breaks the job queue functionality, we shouldn't prevent
839     # startup of the whole node daemon because of this
840     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
841
842   mainloop = daemon.Mainloop()
843   server = NodeHttpServer(mainloop, options.bind_address, options.port,
844                           ssl_params=ssl_params, ssl_verify_peer=True)
845   server.Start()
846   try:
847     mainloop.Run()
848   finally:
849     server.Stop()
850
851
852 def main():
853   """Main function for the node daemon.
854
855   """
856   parser = OptionParser(description="Ganeti node daemon",
857                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
858                         version="%%prog (ganeti) %s" %
859                         constants.RELEASE_VERSION)
860   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
861   dirs.append((constants.LOG_OS_DIR, 0750))
862   dirs.append((constants.LOCK_DIR, 1777))
863   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
864
865
866 if __name__ == '__main__':
867   main()