Bump pep8 version to 1.2
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Subclass ensuring request handlers are locked in RAM.
115
116   """
117   def __init__(self, *args, **kwargs):
118     utils.Mlockall()
119
120     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121
122
123 class NodeRequestHandler(http.server.HttpServerHandler):
124   """The server implementation.
125
126   This class holds all methods exposed over the RPC interface.
127
128   """
129   # too many public methods, and unused args - all methods get params
130   # due to the API
131   # pylint: disable=R0904,W0613
132   def __init__(self):
133     http.server.HttpServerHandler.__init__(self)
134     self.noded_pid = os.getpid()
135
136   def HandleRequest(self, req):
137     """Handle a request.
138
139     """
140     # FIXME: Remove HTTP_PUT in Ganeti 2.7
141     if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
142       raise http.HttpBadRequest("Only PUT and POST methods are supported")
143
144     path = req.request_path
145     if path.startswith("/"):
146       path = path[1:]
147
148     method = getattr(self, "perspective_%s" % path, None)
149     if method is None:
150       raise http.HttpNotFound()
151
152     try:
153       result = (True, method(serializer.LoadJson(req.request_body)))
154
155     except backend.RPCFail, err:
156       # our custom failure exception; str(err) works fine if the
157       # exception was constructed with a single argument, and in
158       # this case, err.message == err.args[0] == str(err)
159       result = (False, str(err))
160     except errors.QuitGanetiException, err:
161       # Tell parent to quit
162       logging.info("Shutting down the node daemon, arguments: %s",
163                    str(err.args))
164       os.kill(self.noded_pid, signal.SIGTERM)
165       # And return the error's arguments, which must be already in
166       # correct tuple format
167       result = err.args
168     except Exception, err:
169       logging.exception("Error in RPC call")
170       result = (False, "Error while executing backend function: %s" % str(err))
171
172     return serializer.DumpJson(result)
173
174   # the new block devices  --------------------------
175
176   @staticmethod
177   def perspective_blockdev_create(params):
178     """Create a block device.
179
180     """
181     bdev_s, size, owner, on_primary, info = params
182     bdev = objects.Disk.FromDict(bdev_s)
183     if bdev is None:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187   @staticmethod
188   def perspective_blockdev_pause_resume_sync(params):
189     """Pause/resume sync of a block device.
190
191     """
192     disks_s, pause = params
193     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
194     return backend.BlockdevPauseResumeSync(disks, pause)
195
196   @staticmethod
197   def perspective_blockdev_wipe(params):
198     """Wipe a block device.
199
200     """
201     bdev_s, offset, size = params
202     bdev = objects.Disk.FromDict(bdev_s)
203     return backend.BlockdevWipe(bdev, offset, size)
204
205   @staticmethod
206   def perspective_blockdev_remove(params):
207     """Remove a block device.
208
209     """
210     bdev_s = params[0]
211     bdev = objects.Disk.FromDict(bdev_s)
212     return backend.BlockdevRemove(bdev)
213
214   @staticmethod
215   def perspective_blockdev_rename(params):
216     """Remove a block device.
217
218     """
219     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
220     return backend.BlockdevRename(devlist)
221
222   @staticmethod
223   def perspective_blockdev_assemble(params):
224     """Assemble a block device.
225
226     """
227     bdev_s, owner, on_primary, idx = params
228     bdev = objects.Disk.FromDict(bdev_s)
229     if bdev is None:
230       raise ValueError("can't unserialize data!")
231     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232
233   @staticmethod
234   def perspective_blockdev_shutdown(params):
235     """Shutdown a block device.
236
237     """
238     bdev_s = params[0]
239     bdev = objects.Disk.FromDict(bdev_s)
240     if bdev is None:
241       raise ValueError("can't unserialize data!")
242     return backend.BlockdevShutdown(bdev)
243
244   @staticmethod
245   def perspective_blockdev_addchildren(params):
246     """Add a child to a mirror device.
247
248     Note: this is only valid for mirror devices. It's the caller's duty
249     to send a correct disk, otherwise we raise an error.
250
251     """
252     bdev_s, ndev_s = params
253     bdev = objects.Disk.FromDict(bdev_s)
254     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255     if bdev is None or ndevs.count(None) > 0:
256       raise ValueError("can't unserialize data!")
257     return backend.BlockdevAddchildren(bdev, ndevs)
258
259   @staticmethod
260   def perspective_blockdev_removechildren(params):
261     """Remove a child from a mirror device.
262
263     This is only valid for mirror devices, of course. It's the callers
264     duty to send a correct disk, otherwise we raise an error.
265
266     """
267     bdev_s, ndev_s = params
268     bdev = objects.Disk.FromDict(bdev_s)
269     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270     if bdev is None or ndevs.count(None) > 0:
271       raise ValueError("can't unserialize data!")
272     return backend.BlockdevRemovechildren(bdev, ndevs)
273
274   @staticmethod
275   def perspective_blockdev_getmirrorstatus(params):
276     """Return the mirror status for a list of disks.
277
278     """
279     disks = [objects.Disk.FromDict(dsk_s)
280              for dsk_s in params[0]]
281     return [status.ToDict()
282             for status in backend.BlockdevGetmirrorstatus(disks)]
283
284   @staticmethod
285   def perspective_blockdev_getmirrorstatus_multi(params):
286     """Return the mirror status for a list of disks.
287
288     """
289     (node_disks, ) = params
290
291     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
292
293     result = []
294
295     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
296       if success:
297         result.append((success, status.ToDict()))
298       else:
299         result.append((success, status))
300
301     return result
302
303   @staticmethod
304   def perspective_blockdev_find(params):
305     """Expose the FindBlockDevice functionality for a disk.
306
307     This will try to find but not activate a disk.
308
309     """
310     disk = objects.Disk.FromDict(params[0])
311
312     result = backend.BlockdevFind(disk)
313     if result is None:
314       return None
315
316     return result.ToDict()
317
318   @staticmethod
319   def perspective_blockdev_snapshot(params):
320     """Create a snapshot device.
321
322     Note that this is only valid for LVM disks, if we get passed
323     something else we raise an exception. The snapshot device can be
324     remove by calling the generic block device remove call.
325
326     """
327     cfbd = objects.Disk.FromDict(params[0])
328     return backend.BlockdevSnapshot(cfbd)
329
330   @staticmethod
331   def perspective_blockdev_grow(params):
332     """Grow a stack of devices.
333
334     """
335     if len(params) < 4:
336       raise ValueError("Received only 3 parameters in blockdev_grow,"
337                        " old master?")
338     cfbd = objects.Disk.FromDict(params[0])
339     amount = params[1]
340     dryrun = params[2]
341     backingstore = params[3]
342     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
343
344   @staticmethod
345   def perspective_blockdev_close(params):
346     """Closes the given block devices.
347
348     """
349     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
350     return backend.BlockdevClose(params[0], disks)
351
352   @staticmethod
353   def perspective_blockdev_getsize(params):
354     """Compute the sizes of the given block devices.
355
356     """
357     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
358     return backend.BlockdevGetsize(disks)
359
360   @staticmethod
361   def perspective_blockdev_export(params):
362     """Compute the sizes of the given block devices.
363
364     """
365     disk = objects.Disk.FromDict(params[0])
366     dest_node, dest_path, cluster_name = params[1:]
367     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
368
369   # blockdev/drbd specific methods ----------
370
371   @staticmethod
372   def perspective_drbd_disconnect_net(params):
373     """Disconnects the network connection of drbd disks.
374
375     Note that this is only valid for drbd disks, so the members of the
376     disk list must all be drbd devices.
377
378     """
379     nodes_ip, disks = params
380     disks = [objects.Disk.FromDict(cf) for cf in disks]
381     return backend.DrbdDisconnectNet(nodes_ip, disks)
382
383   @staticmethod
384   def perspective_drbd_attach_net(params):
385     """Attaches the network connection of drbd disks.
386
387     Note that this is only valid for drbd disks, so the members of the
388     disk list must all be drbd devices.
389
390     """
391     nodes_ip, disks, instance_name, multimaster = params
392     disks = [objects.Disk.FromDict(cf) for cf in disks]
393     return backend.DrbdAttachNet(nodes_ip, disks,
394                                      instance_name, multimaster)
395
396   @staticmethod
397   def perspective_drbd_wait_sync(params):
398     """Wait until DRBD disks are synched.
399
400     Note that this is only valid for drbd disks, so the members of the
401     disk list must all be drbd devices.
402
403     """
404     nodes_ip, disks = params
405     disks = [objects.Disk.FromDict(cf) for cf in disks]
406     return backend.DrbdWaitSync(nodes_ip, disks)
407
408   @staticmethod
409   def perspective_drbd_helper(params):
410     """Query drbd helper.
411
412     """
413     return backend.GetDrbdUsermodeHelper()
414
415   # export/import  --------------------------
416
417   @staticmethod
418   def perspective_finalize_export(params):
419     """Expose the finalize export functionality.
420
421     """
422     instance = objects.Instance.FromDict(params[0])
423
424     snap_disks = []
425     for disk in params[1]:
426       if isinstance(disk, bool):
427         snap_disks.append(disk)
428       else:
429         snap_disks.append(objects.Disk.FromDict(disk))
430
431     return backend.FinalizeExport(instance, snap_disks)
432
433   @staticmethod
434   def perspective_export_info(params):
435     """Query information about an existing export on this node.
436
437     The given path may not contain an export, in which case we return
438     None.
439
440     """
441     path = params[0]
442     return backend.ExportInfo(path)
443
444   @staticmethod
445   def perspective_export_list(params):
446     """List the available exports on this node.
447
448     Note that as opposed to export_info, which may query data about an
449     export in any path, this only queries the standard Ganeti path
450     (constants.EXPORT_DIR).
451
452     """
453     return backend.ListExports()
454
455   @staticmethod
456   def perspective_export_remove(params):
457     """Remove an export.
458
459     """
460     export = params[0]
461     return backend.RemoveExport(export)
462
463   # block device ---------------------
464   @staticmethod
465   def perspective_bdev_sizes(params):
466     """Query the list of block devices
467
468     """
469     devices = params[0]
470     return backend.GetBlockDevSizes(devices)
471
472   # volume  --------------------------
473
474   @staticmethod
475   def perspective_lv_list(params):
476     """Query the list of logical volumes in a given volume group.
477
478     """
479     vgname = params[0]
480     return backend.GetVolumeList(vgname)
481
482   @staticmethod
483   def perspective_vg_list(params):
484     """Query the list of volume groups.
485
486     """
487     return backend.ListVolumeGroups()
488
489   # Storage --------------------------
490
491   @staticmethod
492   def perspective_storage_list(params):
493     """Get list of storage units.
494
495     """
496     (su_name, su_args, name, fields) = params
497     return storage.GetStorage(su_name, *su_args).List(name, fields)
498
499   @staticmethod
500   def perspective_storage_modify(params):
501     """Modify a storage unit.
502
503     """
504     (su_name, su_args, name, changes) = params
505     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
506
507   @staticmethod
508   def perspective_storage_execute(params):
509     """Execute an operation on a storage unit.
510
511     """
512     (su_name, su_args, name, op) = params
513     return storage.GetStorage(su_name, *su_args).Execute(name, op)
514
515   # bridge  --------------------------
516
517   @staticmethod
518   def perspective_bridges_exist(params):
519     """Check if all bridges given exist on this node.
520
521     """
522     bridges_list = params[0]
523     return backend.BridgesExist(bridges_list)
524
525   # instance  --------------------------
526
527   @staticmethod
528   def perspective_instance_os_add(params):
529     """Install an OS on a given instance.
530
531     """
532     inst_s = params[0]
533     inst = objects.Instance.FromDict(inst_s)
534     reinstall = params[1]
535     debug = params[2]
536     return backend.InstanceOsAdd(inst, reinstall, debug)
537
538   @staticmethod
539   def perspective_instance_run_rename(params):
540     """Runs the OS rename script for an instance.
541
542     """
543     inst_s, old_name, debug = params
544     inst = objects.Instance.FromDict(inst_s)
545     return backend.RunRenameInstance(inst, old_name, debug)
546
547   @staticmethod
548   def perspective_instance_shutdown(params):
549     """Shutdown an instance.
550
551     """
552     instance = objects.Instance.FromDict(params[0])
553     timeout = params[1]
554     return backend.InstanceShutdown(instance, timeout)
555
556   @staticmethod
557   def perspective_instance_start(params):
558     """Start an instance.
559
560     """
561     (instance_name, startup_paused) = params
562     instance = objects.Instance.FromDict(instance_name)
563     return backend.StartInstance(instance, startup_paused)
564
565   @staticmethod
566   def perspective_migration_info(params):
567     """Gather information about an instance to be migrated.
568
569     """
570     instance = objects.Instance.FromDict(params[0])
571     return backend.MigrationInfo(instance)
572
573   @staticmethod
574   def perspective_accept_instance(params):
575     """Prepare the node to accept an instance.
576
577     """
578     instance, info, target = params
579     instance = objects.Instance.FromDict(instance)
580     return backend.AcceptInstance(instance, info, target)
581
582   @staticmethod
583   def perspective_instance_finalize_migration_dst(params):
584     """Finalize the instance migration on the destination node.
585
586     """
587     instance, info, success = params
588     instance = objects.Instance.FromDict(instance)
589     return backend.FinalizeMigrationDst(instance, info, success)
590
591   @staticmethod
592   def perspective_instance_migrate(params):
593     """Migrates an instance.
594
595     """
596     instance, target, live = params
597     instance = objects.Instance.FromDict(instance)
598     return backend.MigrateInstance(instance, target, live)
599
600   @staticmethod
601   def perspective_instance_finalize_migration_src(params):
602     """Finalize the instance migration on the source node.
603
604     """
605     instance, success, live = params
606     instance = objects.Instance.FromDict(instance)
607     return backend.FinalizeMigrationSource(instance, success, live)
608
609   @staticmethod
610   def perspective_instance_get_migration_status(params):
611     """Reports migration status.
612
613     """
614     instance = objects.Instance.FromDict(params[0])
615     return backend.GetMigrationStatus(instance).ToDict()
616
617   @staticmethod
618   def perspective_instance_reboot(params):
619     """Reboot an instance.
620
621     """
622     instance = objects.Instance.FromDict(params[0])
623     reboot_type = params[1]
624     shutdown_timeout = params[2]
625     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
626
627   @staticmethod
628   def perspective_instance_balloon_memory(params):
629     """Modify instance runtime memory.
630
631     """
632     instance_dict, memory = params
633     instance = objects.Instance.FromDict(instance_dict)
634     return backend.InstanceBalloonMemory(instance, memory)
635
636   @staticmethod
637   def perspective_instance_info(params):
638     """Query instance information.
639
640     """
641     return backend.GetInstanceInfo(params[0], params[1])
642
643   @staticmethod
644   def perspective_instance_migratable(params):
645     """Query whether the specified instance can be migrated.
646
647     """
648     instance = objects.Instance.FromDict(params[0])
649     return backend.GetInstanceMigratable(instance)
650
651   @staticmethod
652   def perspective_all_instances_info(params):
653     """Query information about all instances.
654
655     """
656     return backend.GetAllInstancesInfo(params[0])
657
658   @staticmethod
659   def perspective_instance_list(params):
660     """Query the list of running instances.
661
662     """
663     return backend.GetInstanceList(params[0])
664
665   # node --------------------------
666
667   @staticmethod
668   def perspective_node_has_ip_address(params):
669     """Checks if a node has the given ip address.
670
671     """
672     return netutils.IPAddress.Own(params[0])
673
674   @staticmethod
675   def perspective_node_info(params):
676     """Query node information.
677
678     """
679     (vg_names, hv_names) = params
680     return backend.GetNodeInfo(vg_names, hv_names)
681
682   @staticmethod
683   def perspective_etc_hosts_modify(params):
684     """Modify a node entry in /etc/hosts.
685
686     """
687     backend.EtcHostsModify(params[0], params[1], params[2])
688
689     return True
690
691   @staticmethod
692   def perspective_node_verify(params):
693     """Run a verify sequence on this node.
694
695     """
696     return backend.VerifyNode(params[0], params[1])
697
698   @staticmethod
699   def perspective_node_start_master_daemons(params):
700     """Start the master daemons on this node.
701
702     """
703     return backend.StartMasterDaemons(params[0])
704
705   @staticmethod
706   def perspective_node_activate_master_ip(params):
707     """Activate the master IP on this node.
708
709     """
710     master_params = objects.MasterNetworkParameters.FromDict(params[0])
711     return backend.ActivateMasterIp(master_params, params[1])
712
713   @staticmethod
714   def perspective_node_deactivate_master_ip(params):
715     """Deactivate the master IP on this node.
716
717     """
718     master_params = objects.MasterNetworkParameters.FromDict(params[0])
719     return backend.DeactivateMasterIp(master_params, params[1])
720
721   @staticmethod
722   def perspective_node_stop_master(params):
723     """Stops master daemons on this node.
724
725     """
726     return backend.StopMasterDaemons()
727
728   @staticmethod
729   def perspective_node_change_master_netmask(params):
730     """Change the master IP netmask.
731
732     """
733     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
734                                        params[3])
735
736   @staticmethod
737   def perspective_node_leave_cluster(params):
738     """Cleanup after leaving a cluster.
739
740     """
741     return backend.LeaveCluster(params[0])
742
743   @staticmethod
744   def perspective_node_volumes(params):
745     """Query the list of all logical volume groups.
746
747     """
748     return backend.NodeVolumes()
749
750   @staticmethod
751   def perspective_node_demote_from_mc(params):
752     """Demote a node from the master candidate role.
753
754     """
755     return backend.DemoteFromMC()
756
757   @staticmethod
758   def perspective_node_powercycle(params):
759     """Tries to powercycle the nod.
760
761     """
762     hypervisor_type = params[0]
763     return backend.PowercycleNode(hypervisor_type)
764
765   # cluster --------------------------
766
767   @staticmethod
768   def perspective_version(params):
769     """Query version information.
770
771     """
772     return constants.PROTOCOL_VERSION
773
774   @staticmethod
775   def perspective_upload_file(params):
776     """Upload a file.
777
778     Note that the backend implementation imposes strict rules on which
779     files are accepted.
780
781     """
782     return backend.UploadFile(*(params[0]))
783
784   @staticmethod
785   def perspective_master_info(params):
786     """Query master information.
787
788     """
789     return backend.GetMasterInfo()
790
791   @staticmethod
792   def perspective_run_oob(params):
793     """Runs oob on node.
794
795     """
796     output = backend.RunOob(params[0], params[1], params[2], params[3])
797     if output:
798       result = serializer.LoadJson(output)
799     else:
800       result = None
801     return result
802
803   @staticmethod
804   def perspective_write_ssconf_files(params):
805     """Write ssconf files.
806
807     """
808     (values,) = params
809     return backend.WriteSsconfFiles(values)
810
811   # os -----------------------
812
813   @staticmethod
814   def perspective_os_diagnose(params):
815     """Query detailed information about existing OSes.
816
817     """
818     return backend.DiagnoseOS()
819
820   @staticmethod
821   def perspective_os_get(params):
822     """Query information about a given OS.
823
824     """
825     name = params[0]
826     os_obj = backend.OSFromDisk(name)
827     return os_obj.ToDict()
828
829   @staticmethod
830   def perspective_os_validate(params):
831     """Run a given OS' validation routine.
832
833     """
834     required, name, checks, params = params
835     return backend.ValidateOS(required, name, checks, params)
836
837   # hooks -----------------------
838
839   @staticmethod
840   def perspective_hooks_runner(params):
841     """Run hook scripts.
842
843     """
844     hpath, phase, env = params
845     hr = backend.HooksRunner()
846     return hr.RunHooks(hpath, phase, env)
847
848   # iallocator -----------------
849
850   @staticmethod
851   def perspective_iallocator_runner(params):
852     """Run an iallocator script.
853
854     """
855     name, idata = params
856     iar = backend.IAllocatorRunner()
857     return iar.Run(name, idata)
858
859   # test -----------------------
860
861   @staticmethod
862   def perspective_test_delay(params):
863     """Run test delay.
864
865     """
866     duration = params[0]
867     status, rval = utils.TestDelay(duration)
868     if not status:
869       raise backend.RPCFail(rval)
870     return rval
871
872   # file storage ---------------
873
874   @staticmethod
875   def perspective_file_storage_dir_create(params):
876     """Create the file storage directory.
877
878     """
879     file_storage_dir = params[0]
880     return backend.CreateFileStorageDir(file_storage_dir)
881
882   @staticmethod
883   def perspective_file_storage_dir_remove(params):
884     """Remove the file storage directory.
885
886     """
887     file_storage_dir = params[0]
888     return backend.RemoveFileStorageDir(file_storage_dir)
889
890   @staticmethod
891   def perspective_file_storage_dir_rename(params):
892     """Rename the file storage directory.
893
894     """
895     old_file_storage_dir = params[0]
896     new_file_storage_dir = params[1]
897     return backend.RenameFileStorageDir(old_file_storage_dir,
898                                         new_file_storage_dir)
899
900   # jobs ------------------------
901
902   @staticmethod
903   @_RequireJobQueueLock
904   def perspective_jobqueue_update(params):
905     """Update job queue.
906
907     """
908     (file_name, content) = params
909     return backend.JobQueueUpdate(file_name, content)
910
911   @staticmethod
912   @_RequireJobQueueLock
913   def perspective_jobqueue_purge(params):
914     """Purge job queue.
915
916     """
917     return backend.JobQueuePurge()
918
919   @staticmethod
920   @_RequireJobQueueLock
921   def perspective_jobqueue_rename(params):
922     """Rename a job queue file.
923
924     """
925     # TODO: What if a file fails to rename?
926     return [backend.JobQueueRename(old, new) for old, new in params[0]]
927
928   # hypervisor ---------------
929
930   @staticmethod
931   def perspective_hypervisor_validate_params(params):
932     """Validate the hypervisor parameters.
933
934     """
935     (hvname, hvparams) = params
936     return backend.ValidateHVParams(hvname, hvparams)
937
938   # Crypto
939
940   @staticmethod
941   def perspective_x509_cert_create(params):
942     """Creates a new X509 certificate for SSL/TLS.
943
944     """
945     (validity, ) = params
946     return backend.CreateX509Certificate(validity)
947
948   @staticmethod
949   def perspective_x509_cert_remove(params):
950     """Removes a X509 certificate.
951
952     """
953     (name, ) = params
954     return backend.RemoveX509Certificate(name)
955
956   # Import and export
957
958   @staticmethod
959   def perspective_import_start(params):
960     """Starts an import daemon.
961
962     """
963     (opts_s, instance, component, (dest, dest_args)) = params
964
965     opts = objects.ImportExportOptions.FromDict(opts_s)
966
967     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
968                                            None, None,
969                                            objects.Instance.FromDict(instance),
970                                            component, dest,
971                                            _DecodeImportExportIO(dest,
972                                                                  dest_args))
973
974   @staticmethod
975   def perspective_export_start(params):
976     """Starts an export daemon.
977
978     """
979     (opts_s, host, port, instance, component, (source, source_args)) = params
980
981     opts = objects.ImportExportOptions.FromDict(opts_s)
982
983     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
984                                            host, port,
985                                            objects.Instance.FromDict(instance),
986                                            component, source,
987                                            _DecodeImportExportIO(source,
988                                                                  source_args))
989
990   @staticmethod
991   def perspective_impexp_status(params):
992     """Retrieves the status of an import or export daemon.
993
994     """
995     return backend.GetImportExportStatus(params[0])
996
997   @staticmethod
998   def perspective_impexp_abort(params):
999     """Aborts an import or export.
1000
1001     """
1002     return backend.AbortImportExport(params[0])
1003
1004   @staticmethod
1005   def perspective_impexp_cleanup(params):
1006     """Cleans up after an import or export.
1007
1008     """
1009     return backend.CleanupImportExport(params[0])
1010
1011
1012 def CheckNoded(_, args):
1013   """Initial checks whether to run or exit with a failure.
1014
1015   """
1016   if args: # noded doesn't take any arguments
1017     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1018                           sys.argv[0])
1019     sys.exit(constants.EXIT_FAILURE)
1020   try:
1021     codecs.lookup("string-escape")
1022   except LookupError:
1023     print >> sys.stderr, ("Can't load the string-escape code which is part"
1024                           " of the Python installation. Is your installation"
1025                           " complete/correct? Aborting.")
1026     sys.exit(constants.EXIT_FAILURE)
1027
1028
1029 def PrepNoded(options, _):
1030   """Preparation node daemon function, executed with the PID file held.
1031
1032   """
1033   if options.mlock:
1034     request_executor_class = MlockallRequestExecutor
1035     try:
1036       utils.Mlockall()
1037     except errors.NoCtypesError:
1038       logging.warning("Cannot set memory lock, ctypes module not found")
1039       request_executor_class = http.server.HttpServerRequestExecutor
1040   else:
1041     request_executor_class = http.server.HttpServerRequestExecutor
1042
1043   # Read SSL certificate
1044   if options.ssl:
1045     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1046                                     ssl_cert_path=options.ssl_cert)
1047   else:
1048     ssl_params = None
1049
1050   err = _PrepareQueueLock()
1051   if err is not None:
1052     # this might be some kind of file-system/permission error; while
1053     # this breaks the job queue functionality, we shouldn't prevent
1054     # startup of the whole node daemon because of this
1055     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1056
1057   handler = NodeRequestHandler()
1058
1059   mainloop = daemon.Mainloop()
1060   server = \
1061     http.server.HttpServer(mainloop, options.bind_address, options.port,
1062                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1063                            request_executor_class=request_executor_class)
1064   server.Start()
1065
1066   return (mainloop, server)
1067
1068
1069 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1070   """Main node daemon function, executed with the PID file held.
1071
1072   """
1073   (mainloop, server) = prep_data
1074   try:
1075     mainloop.Run()
1076   finally:
1077     server.Stop()
1078
1079
1080 def Main():
1081   """Main function for the node daemon.
1082
1083   """
1084   parser = OptionParser(description="Ganeti node daemon",
1085                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1086                         version="%%prog (ganeti) %s" %
1087                         constants.RELEASE_VERSION)
1088   parser.add_option("--no-mlock", dest="mlock",
1089                     help="Do not mlock the node memory in ram",
1090                     default=True, action="store_false")
1091
1092   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1093                      default_ssl_cert=constants.NODED_CERT_FILE,
1094                      default_ssl_key=constants.NODED_CERT_FILE,
1095                      console_logging=True)