Modify gnt-node add to call external script
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable-msg=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36
37 from optparse import OptionParser
38
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49 from ganeti import netutils
50
51 import ganeti.http.server # pylint: disable-msg=W0611
52
53
54 queue_lock = None
55
56
57 def _PrepareQueueLock():
58   """Try to prepare the queue lock.
59
60   @return: None for success, otherwise an exception object
61
62   """
63   global queue_lock # pylint: disable-msg=W0603
64
65   if queue_lock is not None:
66     return None
67
68   # Prepare job queue
69   try:
70     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71     return None
72   except EnvironmentError, err:
73     return err
74
75
76 def _RequireJobQueueLock(fn):
77   """Decorator for job queue manipulating functions.
78
79   """
80   QUEUE_LOCK_TIMEOUT = 10
81
82   def wrapper(*args, **kwargs):
83     # Locking in exclusive, blocking mode because there could be several
84     # children running at the same time. Waiting up to 10 seconds.
85     if _PrepareQueueLock() is not None:
86       raise errors.JobQueueError("Job queue failed initialization,"
87                                  " cannot update jobs")
88     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89     try:
90       return fn(*args, **kwargs)
91     finally:
92       queue_lock.Unlock()
93
94   return wrapper
95
96
97 def _DecodeImportExportIO(ieio, ieioargs):
98   """Decodes import/export I/O information.
99
100   """
101   if ieio == constants.IEIO_RAW_DISK:
102     assert len(ieioargs) == 1
103     return (objects.Disk.FromDict(ieioargs[0]), )
104
105   if ieio == constants.IEIO_SCRIPT:
106     assert len(ieioargs) == 2
107     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
108
109   return ieioargs
110
111
112 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113   """Custom Request Executor class that ensures NodeHttpServer children are
114   locked in ram.
115
116   """
117   def __init__(self, *args, **kwargs):
118     utils.Mlockall()
119
120     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121
122
123 class NodeHttpServer(http.server.HttpServer):
124   """The server implementation.
125
126   This class holds all methods exposed over the RPC interface.
127
128   """
129   # too many public methods, and unused args - all methods get params
130   # due to the API
131   # pylint: disable-msg=R0904,W0613
132   def __init__(self, *args, **kwargs):
133     http.server.HttpServer.__init__(self, *args, **kwargs)
134     self.noded_pid = os.getpid()
135
136   def HandleRequest(self, req):
137     """Handle a request.
138
139     """
140     if req.request_method.upper() != http.HTTP_PUT:
141       raise http.HttpBadRequest()
142
143     path = req.request_path
144     if path.startswith("/"):
145       path = path[1:]
146
147     method = getattr(self, "perspective_%s" % path, None)
148     if method is None:
149       raise http.HttpNotFound()
150
151     try:
152       result = (True, method(serializer.LoadJson(req.request_body)))
153
154     except backend.RPCFail, err:
155       # our custom failure exception; str(err) works fine if the
156       # exception was constructed with a single argument, and in
157       # this case, err.message == err.args[0] == str(err)
158       result = (False, str(err))
159     except errors.QuitGanetiException, err:
160       # Tell parent to quit
161       logging.info("Shutting down the node daemon, arguments: %s",
162                    str(err.args))
163       os.kill(self.noded_pid, signal.SIGTERM)
164       # And return the error's arguments, which must be already in
165       # correct tuple format
166       result = err.args
167     except Exception, err:
168       logging.exception("Error in RPC call")
169       result = (False, "Error while executing backend function: %s" % str(err))
170
171     return serializer.DumpJson(result, indent=False)
172
173   # the new block devices  --------------------------
174
175   @staticmethod
176   def perspective_blockdev_create(params):
177     """Create a block device.
178
179     """
180     bdev_s, size, owner, on_primary, info = params
181     bdev = objects.Disk.FromDict(bdev_s)
182     if bdev is None:
183       raise ValueError("can't unserialize data!")
184     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
185
186   @staticmethod
187   def perspective_blockdev_remove(params):
188     """Remove a block device.
189
190     """
191     bdev_s = params[0]
192     bdev = objects.Disk.FromDict(bdev_s)
193     return backend.BlockdevRemove(bdev)
194
195   @staticmethod
196   def perspective_blockdev_rename(params):
197     """Remove a block device.
198
199     """
200     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
201     return backend.BlockdevRename(devlist)
202
203   @staticmethod
204   def perspective_blockdev_assemble(params):
205     """Assemble a block device.
206
207     """
208     bdev_s, owner, on_primary = params
209     bdev = objects.Disk.FromDict(bdev_s)
210     if bdev is None:
211       raise ValueError("can't unserialize data!")
212     return backend.BlockdevAssemble(bdev, owner, on_primary)
213
214   @staticmethod
215   def perspective_blockdev_shutdown(params):
216     """Shutdown a block device.
217
218     """
219     bdev_s = params[0]
220     bdev = objects.Disk.FromDict(bdev_s)
221     if bdev is None:
222       raise ValueError("can't unserialize data!")
223     return backend.BlockdevShutdown(bdev)
224
225   @staticmethod
226   def perspective_blockdev_addchildren(params):
227     """Add a child to a mirror device.
228
229     Note: this is only valid for mirror devices. It's the caller's duty
230     to send a correct disk, otherwise we raise an error.
231
232     """
233     bdev_s, ndev_s = params
234     bdev = objects.Disk.FromDict(bdev_s)
235     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
236     if bdev is None or ndevs.count(None) > 0:
237       raise ValueError("can't unserialize data!")
238     return backend.BlockdevAddchildren(bdev, ndevs)
239
240   @staticmethod
241   def perspective_blockdev_removechildren(params):
242     """Remove a child from a mirror device.
243
244     This is only valid for mirror devices, of course. It's the callers
245     duty to send a correct disk, otherwise we raise an error.
246
247     """
248     bdev_s, ndev_s = params
249     bdev = objects.Disk.FromDict(bdev_s)
250     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
251     if bdev is None or ndevs.count(None) > 0:
252       raise ValueError("can't unserialize data!")
253     return backend.BlockdevRemovechildren(bdev, ndevs)
254
255   @staticmethod
256   def perspective_blockdev_getmirrorstatus(params):
257     """Return the mirror status for a list of disks.
258
259     """
260     disks = [objects.Disk.FromDict(dsk_s)
261              for dsk_s in params]
262     return [status.ToDict()
263             for status in backend.BlockdevGetmirrorstatus(disks)]
264
265   @staticmethod
266   def perspective_blockdev_find(params):
267     """Expose the FindBlockDevice functionality for a disk.
268
269     This will try to find but not activate a disk.
270
271     """
272     disk = objects.Disk.FromDict(params[0])
273
274     result = backend.BlockdevFind(disk)
275     if result is None:
276       return None
277
278     return result.ToDict()
279
280   @staticmethod
281   def perspective_blockdev_snapshot(params):
282     """Create a snapshot device.
283
284     Note that this is only valid for LVM disks, if we get passed
285     something else we raise an exception. The snapshot device can be
286     remove by calling the generic block device remove call.
287
288     """
289     cfbd = objects.Disk.FromDict(params[0])
290     return backend.BlockdevSnapshot(cfbd)
291
292   @staticmethod
293   def perspective_blockdev_grow(params):
294     """Grow a stack of devices.
295
296     """
297     cfbd = objects.Disk.FromDict(params[0])
298     amount = params[1]
299     return backend.BlockdevGrow(cfbd, amount)
300
301   @staticmethod
302   def perspective_blockdev_close(params):
303     """Closes the given block devices.
304
305     """
306     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
307     return backend.BlockdevClose(params[0], disks)
308
309   @staticmethod
310   def perspective_blockdev_getsize(params):
311     """Compute the sizes of the given block devices.
312
313     """
314     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
315     return backend.BlockdevGetsize(disks)
316
317   @staticmethod
318   def perspective_blockdev_export(params):
319     """Compute the sizes of the given block devices.
320
321     """
322     disk = objects.Disk.FromDict(params[0])
323     dest_node, dest_path, cluster_name = params[1:]
324     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
325
326   # blockdev/drbd specific methods ----------
327
328   @staticmethod
329   def perspective_drbd_disconnect_net(params):
330     """Disconnects the network connection of drbd disks.
331
332     Note that this is only valid for drbd disks, so the members of the
333     disk list must all be drbd devices.
334
335     """
336     nodes_ip, disks = params
337     disks = [objects.Disk.FromDict(cf) for cf in disks]
338     return backend.DrbdDisconnectNet(nodes_ip, disks)
339
340   @staticmethod
341   def perspective_drbd_attach_net(params):
342     """Attaches the network connection of drbd disks.
343
344     Note that this is only valid for drbd disks, so the members of the
345     disk list must all be drbd devices.
346
347     """
348     nodes_ip, disks, instance_name, multimaster = params
349     disks = [objects.Disk.FromDict(cf) for cf in disks]
350     return backend.DrbdAttachNet(nodes_ip, disks,
351                                      instance_name, multimaster)
352
353   @staticmethod
354   def perspective_drbd_wait_sync(params):
355     """Wait until DRBD disks are synched.
356
357     Note that this is only valid for drbd disks, so the members of the
358     disk list must all be drbd devices.
359
360     """
361     nodes_ip, disks = params
362     disks = [objects.Disk.FromDict(cf) for cf in disks]
363     return backend.DrbdWaitSync(nodes_ip, disks)
364
365   @staticmethod
366   def perspective_drbd_helper(params):
367     """Query drbd helper.
368
369     """
370     return backend.GetDrbdUsermodeHelper()
371
372   # export/import  --------------------------
373
374   @staticmethod
375   def perspective_finalize_export(params):
376     """Expose the finalize export functionality.
377
378     """
379     instance = objects.Instance.FromDict(params[0])
380
381     snap_disks = []
382     for disk in params[1]:
383       if isinstance(disk, bool):
384         snap_disks.append(disk)
385       else:
386         snap_disks.append(objects.Disk.FromDict(disk))
387
388     return backend.FinalizeExport(instance, snap_disks)
389
390   @staticmethod
391   def perspective_export_info(params):
392     """Query information about an existing export on this node.
393
394     The given path may not contain an export, in which case we return
395     None.
396
397     """
398     path = params[0]
399     return backend.ExportInfo(path)
400
401   @staticmethod
402   def perspective_export_list(params):
403     """List the available exports on this node.
404
405     Note that as opposed to export_info, which may query data about an
406     export in any path, this only queries the standard Ganeti path
407     (constants.EXPORT_DIR).
408
409     """
410     return backend.ListExports()
411
412   @staticmethod
413   def perspective_export_remove(params):
414     """Remove an export.
415
416     """
417     export = params[0]
418     return backend.RemoveExport(export)
419
420   # volume  --------------------------
421
422   @staticmethod
423   def perspective_lv_list(params):
424     """Query the list of logical volumes in a given volume group.
425
426     """
427     vgname = params[0]
428     return backend.GetVolumeList(vgname)
429
430   @staticmethod
431   def perspective_vg_list(params):
432     """Query the list of volume groups.
433
434     """
435     return backend.ListVolumeGroups()
436
437   # Storage --------------------------
438
439   @staticmethod
440   def perspective_storage_list(params):
441     """Get list of storage units.
442
443     """
444     (su_name, su_args, name, fields) = params
445     return storage.GetStorage(su_name, *su_args).List(name, fields)
446
447   @staticmethod
448   def perspective_storage_modify(params):
449     """Modify a storage unit.
450
451     """
452     (su_name, su_args, name, changes) = params
453     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
454
455   @staticmethod
456   def perspective_storage_execute(params):
457     """Execute an operation on a storage unit.
458
459     """
460     (su_name, su_args, name, op) = params
461     return storage.GetStorage(su_name, *su_args).Execute(name, op)
462
463   # bridge  --------------------------
464
465   @staticmethod
466   def perspective_bridges_exist(params):
467     """Check if all bridges given exist on this node.
468
469     """
470     bridges_list = params[0]
471     return backend.BridgesExist(bridges_list)
472
473   # instance  --------------------------
474
475   @staticmethod
476   def perspective_instance_os_add(params):
477     """Install an OS on a given instance.
478
479     """
480     inst_s = params[0]
481     inst = objects.Instance.FromDict(inst_s)
482     reinstall = params[1]
483     debug = params[2]
484     return backend.InstanceOsAdd(inst, reinstall, debug)
485
486   @staticmethod
487   def perspective_instance_run_rename(params):
488     """Runs the OS rename script for an instance.
489
490     """
491     inst_s, old_name, debug = params
492     inst = objects.Instance.FromDict(inst_s)
493     return backend.RunRenameInstance(inst, old_name, debug)
494
495   @staticmethod
496   def perspective_instance_shutdown(params):
497     """Shutdown an instance.
498
499     """
500     instance = objects.Instance.FromDict(params[0])
501     timeout = params[1]
502     return backend.InstanceShutdown(instance, timeout)
503
504   @staticmethod
505   def perspective_instance_start(params):
506     """Start an instance.
507
508     """
509     instance = objects.Instance.FromDict(params[0])
510     return backend.StartInstance(instance)
511
512   @staticmethod
513   def perspective_migration_info(params):
514     """Gather information about an instance to be migrated.
515
516     """
517     instance = objects.Instance.FromDict(params[0])
518     return backend.MigrationInfo(instance)
519
520   @staticmethod
521   def perspective_accept_instance(params):
522     """Prepare the node to accept an instance.
523
524     """
525     instance, info, target = params
526     instance = objects.Instance.FromDict(instance)
527     return backend.AcceptInstance(instance, info, target)
528
529   @staticmethod
530   def perspective_finalize_migration(params):
531     """Finalize the instance migration.
532
533     """
534     instance, info, success = params
535     instance = objects.Instance.FromDict(instance)
536     return backend.FinalizeMigration(instance, info, success)
537
538   @staticmethod
539   def perspective_instance_migrate(params):
540     """Migrates an instance.
541
542     """
543     instance, target, live = params
544     instance = objects.Instance.FromDict(instance)
545     return backend.MigrateInstance(instance, target, live)
546
547   @staticmethod
548   def perspective_instance_reboot(params):
549     """Reboot an instance.
550
551     """
552     instance = objects.Instance.FromDict(params[0])
553     reboot_type = params[1]
554     shutdown_timeout = params[2]
555     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
556
557   @staticmethod
558   def perspective_instance_info(params):
559     """Query instance information.
560
561     """
562     return backend.GetInstanceInfo(params[0], params[1])
563
564   @staticmethod
565   def perspective_instance_migratable(params):
566     """Query whether the specified instance can be migrated.
567
568     """
569     instance = objects.Instance.FromDict(params[0])
570     return backend.GetInstanceMigratable(instance)
571
572   @staticmethod
573   def perspective_all_instances_info(params):
574     """Query information about all instances.
575
576     """
577     return backend.GetAllInstancesInfo(params[0])
578
579   @staticmethod
580   def perspective_instance_list(params):
581     """Query the list of running instances.
582
583     """
584     return backend.GetInstanceList(params[0])
585
586   # node --------------------------
587
588   @staticmethod
589   def perspective_node_tcp_ping(params):
590     """Do a TcpPing on the remote node.
591
592     """
593     return netutils.TcpPing(params[1], params[2], timeout=params[3],
594                             live_port_needed=params[4], source=params[0])
595
596   @staticmethod
597   def perspective_node_has_ip_address(params):
598     """Checks if a node has the given ip address.
599
600     """
601     return netutils.IPAddress.Own(params[0])
602
603   @staticmethod
604   def perspective_node_info(params):
605     """Query node information.
606
607     """
608     vgname, hypervisor_type = params
609     return backend.GetNodeInfo(vgname, hypervisor_type)
610
611   @staticmethod
612   def perspective_node_verify(params):
613     """Run a verify sequence on this node.
614
615     """
616     return backend.VerifyNode(params[0], params[1])
617
618   @staticmethod
619   def perspective_node_start_master(params):
620     """Promote this node to master status.
621
622     """
623     return backend.StartMaster(params[0], params[1])
624
625   @staticmethod
626   def perspective_node_stop_master(params):
627     """Demote this node from master status.
628
629     """
630     return backend.StopMaster(params[0])
631
632   @staticmethod
633   def perspective_node_leave_cluster(params):
634     """Cleanup after leaving a cluster.
635
636     """
637     return backend.LeaveCluster(params[0])
638
639   @staticmethod
640   def perspective_node_volumes(params):
641     """Query the list of all logical volume groups.
642
643     """
644     return backend.NodeVolumes()
645
646   @staticmethod
647   def perspective_node_demote_from_mc(params):
648     """Demote a node from the master candidate role.
649
650     """
651     return backend.DemoteFromMC()
652
653
654   @staticmethod
655   def perspective_node_powercycle(params):
656     """Tries to powercycle the nod.
657
658     """
659     hypervisor_type = params[0]
660     return backend.PowercycleNode(hypervisor_type)
661
662
663   # cluster --------------------------
664
665   @staticmethod
666   def perspective_version(params):
667     """Query version information.
668
669     """
670     return constants.PROTOCOL_VERSION
671
672   @staticmethod
673   def perspective_upload_file(params):
674     """Upload a file.
675
676     Note that the backend implementation imposes strict rules on which
677     files are accepted.
678
679     """
680     return backend.UploadFile(*params)
681
682   @staticmethod
683   def perspective_master_info(params):
684     """Query master information.
685
686     """
687     return backend.GetMasterInfo()
688
689   @staticmethod
690   def perspective_write_ssconf_files(params):
691     """Write ssconf files.
692
693     """
694     (values,) = params
695     return backend.WriteSsconfFiles(values)
696
697   # os -----------------------
698
699   @staticmethod
700   def perspective_os_diagnose(params):
701     """Query detailed information about existing OSes.
702
703     """
704     return backend.DiagnoseOS()
705
706   @staticmethod
707   def perspective_os_get(params):
708     """Query information about a given OS.
709
710     """
711     name = params[0]
712     os_obj = backend.OSFromDisk(name)
713     return os_obj.ToDict()
714
715   @staticmethod
716   def perspective_os_validate(params):
717     """Run a given OS' validation routine.
718
719     """
720     required, name, checks, params = params
721     return backend.ValidateOS(required, name, checks, params)
722
723   # hooks -----------------------
724
725   @staticmethod
726   def perspective_hooks_runner(params):
727     """Run hook scripts.
728
729     """
730     hpath, phase, env = params
731     hr = backend.HooksRunner()
732     return hr.RunHooks(hpath, phase, env)
733
734   # iallocator -----------------
735
736   @staticmethod
737   def perspective_iallocator_runner(params):
738     """Run an iallocator script.
739
740     """
741     name, idata = params
742     iar = backend.IAllocatorRunner()
743     return iar.Run(name, idata)
744
745   # test -----------------------
746
747   @staticmethod
748   def perspective_test_delay(params):
749     """Run test delay.
750
751     """
752     duration = params[0]
753     status, rval = utils.TestDelay(duration)
754     if not status:
755       raise backend.RPCFail(rval)
756     return rval
757
758   # file storage ---------------
759
760   @staticmethod
761   def perspective_file_storage_dir_create(params):
762     """Create the file storage directory.
763
764     """
765     file_storage_dir = params[0]
766     return backend.CreateFileStorageDir(file_storage_dir)
767
768   @staticmethod
769   def perspective_file_storage_dir_remove(params):
770     """Remove the file storage directory.
771
772     """
773     file_storage_dir = params[0]
774     return backend.RemoveFileStorageDir(file_storage_dir)
775
776   @staticmethod
777   def perspective_file_storage_dir_rename(params):
778     """Rename the file storage directory.
779
780     """
781     old_file_storage_dir = params[0]
782     new_file_storage_dir = params[1]
783     return backend.RenameFileStorageDir(old_file_storage_dir,
784                                         new_file_storage_dir)
785
786   # jobs ------------------------
787
788   @staticmethod
789   @_RequireJobQueueLock
790   def perspective_jobqueue_update(params):
791     """Update job queue.
792
793     """
794     (file_name, content) = params
795     return backend.JobQueueUpdate(file_name, content)
796
797   @staticmethod
798   @_RequireJobQueueLock
799   def perspective_jobqueue_purge(params):
800     """Purge job queue.
801
802     """
803     return backend.JobQueuePurge()
804
805   @staticmethod
806   @_RequireJobQueueLock
807   def perspective_jobqueue_rename(params):
808     """Rename a job queue file.
809
810     """
811     # TODO: What if a file fails to rename?
812     return [backend.JobQueueRename(old, new) for old, new in params]
813
814   # hypervisor ---------------
815
816   @staticmethod
817   def perspective_hypervisor_validate_params(params):
818     """Validate the hypervisor parameters.
819
820     """
821     (hvname, hvparams) = params
822     return backend.ValidateHVParams(hvname, hvparams)
823
824   # Crypto
825
826   @staticmethod
827   def perspective_x509_cert_create(params):
828     """Creates a new X509 certificate for SSL/TLS.
829
830     """
831     (validity, ) = params
832     return backend.CreateX509Certificate(validity)
833
834   @staticmethod
835   def perspective_x509_cert_remove(params):
836     """Removes a X509 certificate.
837
838     """
839     (name, ) = params
840     return backend.RemoveX509Certificate(name)
841
842   # Import and export
843
844   @staticmethod
845   def perspective_import_start(params):
846     """Starts an import daemon.
847
848     """
849     (opts_s, instance, dest, dest_args) = params
850
851     opts = objects.ImportExportOptions.FromDict(opts_s)
852
853     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
854                                            None, None,
855                                            objects.Instance.FromDict(instance),
856                                            dest,
857                                            _DecodeImportExportIO(dest,
858                                                                  dest_args))
859
860   @staticmethod
861   def perspective_export_start(params):
862     """Starts an export daemon.
863
864     """
865     (opts_s, host, port, instance, source, source_args) = params
866
867     opts = objects.ImportExportOptions.FromDict(opts_s)
868
869     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
870                                            host, port,
871                                            objects.Instance.FromDict(instance),
872                                            source,
873                                            _DecodeImportExportIO(source,
874                                                                  source_args))
875
876   @staticmethod
877   def perspective_impexp_status(params):
878     """Retrieves the status of an import or export daemon.
879
880     """
881     return backend.GetImportExportStatus(params[0])
882
883   @staticmethod
884   def perspective_impexp_abort(params):
885     """Aborts an import or export.
886
887     """
888     return backend.AbortImportExport(params[0])
889
890   @staticmethod
891   def perspective_impexp_cleanup(params):
892     """Cleans up after an import or export.
893
894     """
895     return backend.CleanupImportExport(params[0])
896
897
898 def CheckNoded(_, args):
899   """Initial checks whether to run or exit with a failure.
900
901   """
902   if args: # noded doesn't take any arguments
903     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
904                           sys.argv[0])
905     sys.exit(constants.EXIT_FAILURE)
906
907
908 def ExecNoded(options, _):
909   """Main node daemon function, executed with the PID file held.
910
911   """
912   if options.mlock:
913     request_executor_class = MlockallRequestExecutor
914     try:
915       utils.Mlockall()
916     except errors.NoCtypesError:
917       logging.warning("Cannot set memory lock, ctypes module not found")
918       request_executor_class = http.server.HttpServerRequestExecutor
919   else:
920     request_executor_class = http.server.HttpServerRequestExecutor
921
922   # Read SSL certificate
923   if options.ssl:
924     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
925                                     ssl_cert_path=options.ssl_cert)
926   else:
927     ssl_params = None
928
929   err = _PrepareQueueLock()
930   if err is not None:
931     # this might be some kind of file-system/permission error; while
932     # this breaks the job queue functionality, we shouldn't prevent
933     # startup of the whole node daemon because of this
934     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
935
936   mainloop = daemon.Mainloop()
937   server = NodeHttpServer(mainloop, options.bind_address, options.port,
938                           ssl_params=ssl_params, ssl_verify_peer=True,
939                           request_executor_class=request_executor_class)
940   server.Start()
941   try:
942     mainloop.Run()
943   finally:
944     server.Stop()
945
946
947 def main():
948   """Main function for the node daemon.
949
950   """
951   parser = OptionParser(description="Ganeti node daemon",
952                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
953                         version="%%prog (ganeti) %s" %
954                         constants.RELEASE_VERSION)
955   parser.add_option("--no-mlock", dest="mlock",
956                     help="Do not mlock the node memory in ram",
957                     default=True, action="store_false")
958
959   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
960   dirs.append((constants.LOG_OS_DIR, 0750))
961   dirs.append((constants.LOCK_DIR, 1777))
962   dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
963   dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
964   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
965                      default_ssl_cert=constants.NODED_CERT_FILE,
966                      default_ssl_key=constants.NODED_CERT_FILE,
967                      console_logging=True)
968
969
970 if __name__ == '__main__':
971   main()