DeprecationWarning fixes for pylint
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     if req.request_method.upper() != http.HTTP_PUT:
142       raise http.HttpBadRequest()
143
144     path = req.request_path
145     if path.startswith("/"):
146       path = path[1:]
147
148     method = getattr(self, "perspective_%s" % path, None)
149     if method is None:
150       raise http.HttpNotFound()
151
152     try:
153       result = (True, method(serializer.LoadJson(req.request_body)))
154
155     except backend.RPCFail, err:
156       # our custom failure exception; str(err) works fine if the
157       # exception was constructed with a single argument, and in
158       # this case, err.message == err.args[0] == str(err)
159       result = (False, str(err))
160     except errors.QuitGanetiException, err:
161       # Tell parent to quit
162       logging.info("Shutting down the node daemon, arguments: %s",
163                    str(err.args))
164       os.kill(self.noded_pid, signal.SIGTERM)
165       # And return the error's arguments, which must be already in
166       # correct tuple format
167       result = err.args
168     except Exception, err:
169       logging.exception("Error in RPC call")
170       result = (False, "Error while executing backend function: %s" % str(err))
171
172     return serializer.DumpJson(result, indent=False)
173
174   # the new block devices  --------------------------
175
176   @staticmethod
177   def perspective_blockdev_create(params):
178     """Create a block device.
179
180     """
181     bdev_s, size, owner, on_primary, info = params
182     bdev = objects.Disk.FromDict(bdev_s)
183     if bdev is None:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187   @staticmethod
188   def perspective_blockdev_pause_resume_sync(params):
189     """Pause/resume sync of a block device.
190
191     """
192     disks_s, pause = params
193     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
194     return backend.BlockdevPauseResumeSync(disks, pause)
195
196   @staticmethod
197   def perspective_blockdev_wipe(params):
198     """Wipe a block device.
199
200     """
201     bdev_s, offset, size = params
202     bdev = objects.Disk.FromDict(bdev_s)
203     return backend.BlockdevWipe(bdev, offset, size)
204
205   @staticmethod
206   def perspective_blockdev_remove(params):
207     """Remove a block device.
208
209     """
210     bdev_s = params[0]
211     bdev = objects.Disk.FromDict(bdev_s)
212     return backend.BlockdevRemove(bdev)
213
214   @staticmethod
215   def perspective_blockdev_rename(params):
216     """Remove a block device.
217
218     """
219     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
220     return backend.BlockdevRename(devlist)
221
222   @staticmethod
223   def perspective_blockdev_assemble(params):
224     """Assemble a block device.
225
226     """
227     bdev_s, owner, on_primary, idx = params
228     bdev = objects.Disk.FromDict(bdev_s)
229     if bdev is None:
230       raise ValueError("can't unserialize data!")
231     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232
233   @staticmethod
234   def perspective_blockdev_shutdown(params):
235     """Shutdown a block device.
236
237     """
238     bdev_s = params[0]
239     bdev = objects.Disk.FromDict(bdev_s)
240     if bdev is None:
241       raise ValueError("can't unserialize data!")
242     return backend.BlockdevShutdown(bdev)
243
244   @staticmethod
245   def perspective_blockdev_addchildren(params):
246     """Add a child to a mirror device.
247
248     Note: this is only valid for mirror devices. It's the caller's duty
249     to send a correct disk, otherwise we raise an error.
250
251     """
252     bdev_s, ndev_s = params
253     bdev = objects.Disk.FromDict(bdev_s)
254     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255     if bdev is None or ndevs.count(None) > 0:
256       raise ValueError("can't unserialize data!")
257     return backend.BlockdevAddchildren(bdev, ndevs)
258
259   @staticmethod
260   def perspective_blockdev_removechildren(params):
261     """Remove a child from a mirror device.
262
263     This is only valid for mirror devices, of course. It's the callers
264     duty to send a correct disk, otherwise we raise an error.
265
266     """
267     bdev_s, ndev_s = params
268     bdev = objects.Disk.FromDict(bdev_s)
269     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270     if bdev is None or ndevs.count(None) > 0:
271       raise ValueError("can't unserialize data!")
272     return backend.BlockdevRemovechildren(bdev, ndevs)
273
274   @staticmethod
275   def perspective_blockdev_getmirrorstatus(params):
276     """Return the mirror status for a list of disks.
277
278     """
279     disks = [objects.Disk.FromDict(dsk_s)
280              for dsk_s in params]
281     return [status.ToDict()
282             for status in backend.BlockdevGetmirrorstatus(disks)]
283
284   @staticmethod
285   def perspective_blockdev_getmirrorstatus_multi(params):
286     """Return the mirror status for a list of disks.
287
288     """
289     (node_disks, ) = params
290
291     node_name = netutils.Hostname.GetSysName()
292
293     disks = [objects.Disk.FromDict(dsk_s)
294              for dsk_s in node_disks.get(node_name, [])]
295
296     result = []
297
298     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299       if success:
300         result.append((success, status.ToDict()))
301       else:
302         result.append((success, status))
303
304     return result
305
306   @staticmethod
307   def perspective_blockdev_find(params):
308     """Expose the FindBlockDevice functionality for a disk.
309
310     This will try to find but not activate a disk.
311
312     """
313     disk = objects.Disk.FromDict(params[0])
314
315     result = backend.BlockdevFind(disk)
316     if result is None:
317       return None
318
319     return result.ToDict()
320
321   @staticmethod
322   def perspective_blockdev_snapshot(params):
323     """Create a snapshot device.
324
325     Note that this is only valid for LVM disks, if we get passed
326     something else we raise an exception. The snapshot device can be
327     remove by calling the generic block device remove call.
328
329     """
330     cfbd = objects.Disk.FromDict(params[0])
331     return backend.BlockdevSnapshot(cfbd)
332
333   @staticmethod
334   def perspective_blockdev_grow(params):
335     """Grow a stack of devices.
336
337     """
338     cfbd = objects.Disk.FromDict(params[0])
339     amount = params[1]
340     dryrun = params[2]
341     return backend.BlockdevGrow(cfbd, amount, dryrun)
342
343   @staticmethod
344   def perspective_blockdev_close(params):
345     """Closes the given block devices.
346
347     """
348     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
349     return backend.BlockdevClose(params[0], disks)
350
351   @staticmethod
352   def perspective_blockdev_getsize(params):
353     """Compute the sizes of the given block devices.
354
355     """
356     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
357     return backend.BlockdevGetsize(disks)
358
359   @staticmethod
360   def perspective_blockdev_export(params):
361     """Compute the sizes of the given block devices.
362
363     """
364     disk = objects.Disk.FromDict(params[0])
365     dest_node, dest_path, cluster_name = params[1:]
366     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
367
368   # blockdev/drbd specific methods ----------
369
370   @staticmethod
371   def perspective_drbd_disconnect_net(params):
372     """Disconnects the network connection of drbd disks.
373
374     Note that this is only valid for drbd disks, so the members of the
375     disk list must all be drbd devices.
376
377     """
378     nodes_ip, disks = params
379     disks = [objects.Disk.FromDict(cf) for cf in disks]
380     return backend.DrbdDisconnectNet(nodes_ip, disks)
381
382   @staticmethod
383   def perspective_drbd_attach_net(params):
384     """Attaches the network connection of drbd disks.
385
386     Note that this is only valid for drbd disks, so the members of the
387     disk list must all be drbd devices.
388
389     """
390     nodes_ip, disks, instance_name, multimaster = params
391     disks = [objects.Disk.FromDict(cf) for cf in disks]
392     return backend.DrbdAttachNet(nodes_ip, disks,
393                                      instance_name, multimaster)
394
395   @staticmethod
396   def perspective_drbd_wait_sync(params):
397     """Wait until DRBD disks are synched.
398
399     Note that this is only valid for drbd disks, so the members of the
400     disk list must all be drbd devices.
401
402     """
403     nodes_ip, disks = params
404     disks = [objects.Disk.FromDict(cf) for cf in disks]
405     return backend.DrbdWaitSync(nodes_ip, disks)
406
407   @staticmethod
408   def perspective_drbd_helper(params):
409     """Query drbd helper.
410
411     """
412     return backend.GetDrbdUsermodeHelper()
413
414   # export/import  --------------------------
415
416   @staticmethod
417   def perspective_finalize_export(params):
418     """Expose the finalize export functionality.
419
420     """
421     instance = objects.Instance.FromDict(params[0])
422
423     snap_disks = []
424     for disk in params[1]:
425       if isinstance(disk, bool):
426         snap_disks.append(disk)
427       else:
428         snap_disks.append(objects.Disk.FromDict(disk))
429
430     return backend.FinalizeExport(instance, snap_disks)
431
432   @staticmethod
433   def perspective_export_info(params):
434     """Query information about an existing export on this node.
435
436     The given path may not contain an export, in which case we return
437     None.
438
439     """
440     path = params[0]
441     return backend.ExportInfo(path)
442
443   @staticmethod
444   def perspective_export_list(params):
445     """List the available exports on this node.
446
447     Note that as opposed to export_info, which may query data about an
448     export in any path, this only queries the standard Ganeti path
449     (constants.EXPORT_DIR).
450
451     """
452     return backend.ListExports()
453
454   @staticmethod
455   def perspective_export_remove(params):
456     """Remove an export.
457
458     """
459     export = params[0]
460     return backend.RemoveExport(export)
461
462   # block device ---------------------
463   @staticmethod
464   def perspective_bdev_sizes(params):
465     """Query the list of block devices
466
467     """
468     devices = params[0]
469     return backend.GetBlockDevSizes(devices)
470
471   # volume  --------------------------
472
473   @staticmethod
474   def perspective_lv_list(params):
475     """Query the list of logical volumes in a given volume group.
476
477     """
478     vgname = params[0]
479     return backend.GetVolumeList(vgname)
480
481   @staticmethod
482   def perspective_vg_list(params):
483     """Query the list of volume groups.
484
485     """
486     return backend.ListVolumeGroups()
487
488   # Storage --------------------------
489
490   @staticmethod
491   def perspective_storage_list(params):
492     """Get list of storage units.
493
494     """
495     (su_name, su_args, name, fields) = params
496     return storage.GetStorage(su_name, *su_args).List(name, fields)
497
498   @staticmethod
499   def perspective_storage_modify(params):
500     """Modify a storage unit.
501
502     """
503     (su_name, su_args, name, changes) = params
504     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
505
506   @staticmethod
507   def perspective_storage_execute(params):
508     """Execute an operation on a storage unit.
509
510     """
511     (su_name, su_args, name, op) = params
512     return storage.GetStorage(su_name, *su_args).Execute(name, op)
513
514   # bridge  --------------------------
515
516   @staticmethod
517   def perspective_bridges_exist(params):
518     """Check if all bridges given exist on this node.
519
520     """
521     bridges_list = params[0]
522     return backend.BridgesExist(bridges_list)
523
524   # instance  --------------------------
525
526   @staticmethod
527   def perspective_instance_os_add(params):
528     """Install an OS on a given instance.
529
530     """
531     inst_s = params[0]
532     inst = objects.Instance.FromDict(inst_s)
533     reinstall = params[1]
534     debug = params[2]
535     return backend.InstanceOsAdd(inst, reinstall, debug)
536
537   @staticmethod
538   def perspective_instance_run_rename(params):
539     """Runs the OS rename script for an instance.
540
541     """
542     inst_s, old_name, debug = params
543     inst = objects.Instance.FromDict(inst_s)
544     return backend.RunRenameInstance(inst, old_name, debug)
545
546   @staticmethod
547   def perspective_instance_shutdown(params):
548     """Shutdown an instance.
549
550     """
551     instance = objects.Instance.FromDict(params[0])
552     timeout = params[1]
553     return backend.InstanceShutdown(instance, timeout)
554
555   @staticmethod
556   def perspective_instance_start(params):
557     """Start an instance.
558
559     """
560     (instance_name, startup_paused) = params
561     instance = objects.Instance.FromDict(instance_name)
562     return backend.StartInstance(instance, startup_paused)
563
564   @staticmethod
565   def perspective_migration_info(params):
566     """Gather information about an instance to be migrated.
567
568     """
569     instance = objects.Instance.FromDict(params[0])
570     return backend.MigrationInfo(instance)
571
572   @staticmethod
573   def perspective_accept_instance(params):
574     """Prepare the node to accept an instance.
575
576     """
577     instance, info, target = params
578     instance = objects.Instance.FromDict(instance)
579     return backend.AcceptInstance(instance, info, target)
580
581   @staticmethod
582   def perspective_finalize_migration(params):
583     """Finalize the instance migration.
584
585     """
586     instance, info, success = params
587     instance = objects.Instance.FromDict(instance)
588     return backend.FinalizeMigration(instance, info, success)
589
590   @staticmethod
591   def perspective_instance_migrate(params):
592     """Migrates an instance.
593
594     """
595     instance, target, live = params
596     instance = objects.Instance.FromDict(instance)
597     return backend.MigrateInstance(instance, target, live)
598
599   @staticmethod
600   def perspective_instance_reboot(params):
601     """Reboot an instance.
602
603     """
604     instance = objects.Instance.FromDict(params[0])
605     reboot_type = params[1]
606     shutdown_timeout = params[2]
607     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
608
609   @staticmethod
610   def perspective_instance_info(params):
611     """Query instance information.
612
613     """
614     return backend.GetInstanceInfo(params[0], params[1])
615
616   @staticmethod
617   def perspective_instance_migratable(params):
618     """Query whether the specified instance can be migrated.
619
620     """
621     instance = objects.Instance.FromDict(params[0])
622     return backend.GetInstanceMigratable(instance)
623
624   @staticmethod
625   def perspective_all_instances_info(params):
626     """Query information about all instances.
627
628     """
629     return backend.GetAllInstancesInfo(params[0])
630
631   @staticmethod
632   def perspective_instance_list(params):
633     """Query the list of running instances.
634
635     """
636     return backend.GetInstanceList(params[0])
637
638   # node --------------------------
639
640   @staticmethod
641   def perspective_node_tcp_ping(params):
642     """Do a TcpPing on the remote node.
643
644     """
645     return netutils.TcpPing(params[1], params[2], timeout=params[3],
646                             live_port_needed=params[4], source=params[0])
647
648   @staticmethod
649   def perspective_node_has_ip_address(params):
650     """Checks if a node has the given ip address.
651
652     """
653     return netutils.IPAddress.Own(params[0])
654
655   @staticmethod
656   def perspective_node_info(params):
657     """Query node information.
658
659     """
660     vgname, hypervisor_type = params
661     return backend.GetNodeInfo(vgname, hypervisor_type)
662
663   @staticmethod
664   def perspective_etc_hosts_modify(params):
665     """Modify a node entry in /etc/hosts.
666
667     """
668     backend.EtcHostsModify(params[0], params[1], params[2])
669
670     return True
671
672   @staticmethod
673   def perspective_node_verify(params):
674     """Run a verify sequence on this node.
675
676     """
677     return backend.VerifyNode(params[0], params[1])
678
679   @staticmethod
680   def perspective_node_start_master(params):
681     """Promote this node to master status.
682
683     """
684     return backend.StartMaster(params[0], params[1])
685
686   @staticmethod
687   def perspective_node_stop_master(params):
688     """Demote this node from master status.
689
690     """
691     return backend.StopMaster(params[0])
692
693   @staticmethod
694   def perspective_node_leave_cluster(params):
695     """Cleanup after leaving a cluster.
696
697     """
698     return backend.LeaveCluster(params[0])
699
700   @staticmethod
701   def perspective_node_volumes(params):
702     """Query the list of all logical volume groups.
703
704     """
705     return backend.NodeVolumes()
706
707   @staticmethod
708   def perspective_node_demote_from_mc(params):
709     """Demote a node from the master candidate role.
710
711     """
712     return backend.DemoteFromMC()
713
714   @staticmethod
715   def perspective_node_powercycle(params):
716     """Tries to powercycle the nod.
717
718     """
719     hypervisor_type = params[0]
720     return backend.PowercycleNode(hypervisor_type)
721
722   # cluster --------------------------
723
724   @staticmethod
725   def perspective_version(params):
726     """Query version information.
727
728     """
729     return constants.PROTOCOL_VERSION
730
731   @staticmethod
732   def perspective_upload_file(params):
733     """Upload a file.
734
735     Note that the backend implementation imposes strict rules on which
736     files are accepted.
737
738     """
739     return backend.UploadFile(*params)
740
741   @staticmethod
742   def perspective_master_info(params):
743     """Query master information.
744
745     """
746     return backend.GetMasterInfo()
747
748   @staticmethod
749   def perspective_run_oob(params):
750     """Runs oob on node.
751
752     """
753     output = backend.RunOob(params[0], params[1], params[2], params[3])
754     if output:
755       result = serializer.LoadJson(output)
756     else:
757       result = None
758     return result
759
760   @staticmethod
761   def perspective_write_ssconf_files(params):
762     """Write ssconf files.
763
764     """
765     (values,) = params
766     return backend.WriteSsconfFiles(values)
767
768   # os -----------------------
769
770   @staticmethod
771   def perspective_os_diagnose(params):
772     """Query detailed information about existing OSes.
773
774     """
775     return backend.DiagnoseOS()
776
777   @staticmethod
778   def perspective_os_get(params):
779     """Query information about a given OS.
780
781     """
782     name = params[0]
783     os_obj = backend.OSFromDisk(name)
784     return os_obj.ToDict()
785
786   @staticmethod
787   def perspective_os_validate(params):
788     """Run a given OS' validation routine.
789
790     """
791     required, name, checks, params = params
792     return backend.ValidateOS(required, name, checks, params)
793
794   # hooks -----------------------
795
796   @staticmethod
797   def perspective_hooks_runner(params):
798     """Run hook scripts.
799
800     """
801     hpath, phase, env = params
802     hr = backend.HooksRunner()
803     return hr.RunHooks(hpath, phase, env)
804
805   # iallocator -----------------
806
807   @staticmethod
808   def perspective_iallocator_runner(params):
809     """Run an iallocator script.
810
811     """
812     name, idata = params
813     iar = backend.IAllocatorRunner()
814     return iar.Run(name, idata)
815
816   # test -----------------------
817
818   @staticmethod
819   def perspective_test_delay(params):
820     """Run test delay.
821
822     """
823     duration = params[0]
824     status, rval = utils.TestDelay(duration)
825     if not status:
826       raise backend.RPCFail(rval)
827     return rval
828
829   # file storage ---------------
830
831   @staticmethod
832   def perspective_file_storage_dir_create(params):
833     """Create the file storage directory.
834
835     """
836     file_storage_dir = params[0]
837     return backend.CreateFileStorageDir(file_storage_dir)
838
839   @staticmethod
840   def perspective_file_storage_dir_remove(params):
841     """Remove the file storage directory.
842
843     """
844     file_storage_dir = params[0]
845     return backend.RemoveFileStorageDir(file_storage_dir)
846
847   @staticmethod
848   def perspective_file_storage_dir_rename(params):
849     """Rename the file storage directory.
850
851     """
852     old_file_storage_dir = params[0]
853     new_file_storage_dir = params[1]
854     return backend.RenameFileStorageDir(old_file_storage_dir,
855                                         new_file_storage_dir)
856
857   # jobs ------------------------
858
859   @staticmethod
860   @_RequireJobQueueLock
861   def perspective_jobqueue_update(params):
862     """Update job queue.
863
864     """
865     (file_name, content) = params
866     return backend.JobQueueUpdate(file_name, content)
867
868   @staticmethod
869   @_RequireJobQueueLock
870   def perspective_jobqueue_purge(params):
871     """Purge job queue.
872
873     """
874     return backend.JobQueuePurge()
875
876   @staticmethod
877   @_RequireJobQueueLock
878   def perspective_jobqueue_rename(params):
879     """Rename a job queue file.
880
881     """
882     # TODO: What if a file fails to rename?
883     return [backend.JobQueueRename(old, new) for old, new in params]
884
885   # hypervisor ---------------
886
887   @staticmethod
888   def perspective_hypervisor_validate_params(params):
889     """Validate the hypervisor parameters.
890
891     """
892     (hvname, hvparams) = params
893     return backend.ValidateHVParams(hvname, hvparams)
894
895   # Crypto
896
897   @staticmethod
898   def perspective_x509_cert_create(params):
899     """Creates a new X509 certificate for SSL/TLS.
900
901     """
902     (validity, ) = params
903     return backend.CreateX509Certificate(validity)
904
905   @staticmethod
906   def perspective_x509_cert_remove(params):
907     """Removes a X509 certificate.
908
909     """
910     (name, ) = params
911     return backend.RemoveX509Certificate(name)
912
913   # Import and export
914
915   @staticmethod
916   def perspective_import_start(params):
917     """Starts an import daemon.
918
919     """
920     (opts_s, instance, component, dest, dest_args) = params
921
922     opts = objects.ImportExportOptions.FromDict(opts_s)
923
924     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
925                                            None, None,
926                                            objects.Instance.FromDict(instance),
927                                            component, dest,
928                                            _DecodeImportExportIO(dest,
929                                                                  dest_args))
930
931   @staticmethod
932   def perspective_export_start(params):
933     """Starts an export daemon.
934
935     """
936     (opts_s, host, port, instance, component, source, source_args) = params
937
938     opts = objects.ImportExportOptions.FromDict(opts_s)
939
940     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
941                                            host, port,
942                                            objects.Instance.FromDict(instance),
943                                            component, source,
944                                            _DecodeImportExportIO(source,
945                                                                  source_args))
946
947   @staticmethod
948   def perspective_impexp_status(params):
949     """Retrieves the status of an import or export daemon.
950
951     """
952     return backend.GetImportExportStatus(params[0])
953
954   @staticmethod
955   def perspective_impexp_abort(params):
956     """Aborts an import or export.
957
958     """
959     return backend.AbortImportExport(params[0])
960
961   @staticmethod
962   def perspective_impexp_cleanup(params):
963     """Cleans up after an import or export.
964
965     """
966     return backend.CleanupImportExport(params[0])
967
968
969 def CheckNoded(_, args):
970   """Initial checks whether to run or exit with a failure.
971
972   """
973   if args: # noded doesn't take any arguments
974     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
975                           sys.argv[0])
976     sys.exit(constants.EXIT_FAILURE)
977   try:
978     codecs.lookup("string-escape")
979   except LookupError:
980     print >> sys.stderr, ("Can't load the string-escape code which is part"
981                           " of the Python installation. Is your installation"
982                           " complete/correct? Aborting.")
983     sys.exit(constants.EXIT_FAILURE)
984
985
986 def PrepNoded(options, _):
987   """Preparation node daemon function, executed with the PID file held.
988
989   """
990   if options.mlock:
991     request_executor_class = MlockallRequestExecutor
992     try:
993       utils.Mlockall()
994     except errors.NoCtypesError:
995       logging.warning("Cannot set memory lock, ctypes module not found")
996       request_executor_class = http.server.HttpServerRequestExecutor
997   else:
998     request_executor_class = http.server.HttpServerRequestExecutor
999
1000   # Read SSL certificate
1001   if options.ssl:
1002     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1003                                     ssl_cert_path=options.ssl_cert)
1004   else:
1005     ssl_params = None
1006
1007   err = _PrepareQueueLock()
1008   if err is not None:
1009     # this might be some kind of file-system/permission error; while
1010     # this breaks the job queue functionality, we shouldn't prevent
1011     # startup of the whole node daemon because of this
1012     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1013
1014   mainloop = daemon.Mainloop()
1015   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1016                           ssl_params=ssl_params, ssl_verify_peer=True,
1017                           request_executor_class=request_executor_class)
1018   server.Start()
1019   return (mainloop, server)
1020
1021
1022 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1023   """Main node daemon function, executed with the PID file held.
1024
1025   """
1026   (mainloop, server) = prep_data
1027   try:
1028     mainloop.Run()
1029   finally:
1030     server.Stop()
1031
1032
1033 def Main():
1034   """Main function for the node daemon.
1035
1036   """
1037   parser = OptionParser(description="Ganeti node daemon",
1038                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1039                         version="%%prog (ganeti) %s" %
1040                         constants.RELEASE_VERSION)
1041   parser.add_option("--no-mlock", dest="mlock",
1042                     help="Do not mlock the node memory in ram",
1043                     default=True, action="store_false")
1044
1045   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1046                      default_ssl_cert=constants.NODED_CERT_FILE,
1047                      default_ssl_key=constants.NODED_CERT_FILE,
1048                      console_logging=True)