Merge branch 'stable-2.5' into devel-2.5
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     if req.request_method.upper() != http.HTTP_PUT:
142       raise http.HttpBadRequest()
143
144     path = req.request_path
145     if path.startswith("/"):
146       path = path[1:]
147
148     method = getattr(self, "perspective_%s" % path, None)
149     if method is None:
150       raise http.HttpNotFound()
151
152     try:
153       result = (True, method(serializer.LoadJson(req.request_body)))
154
155     except backend.RPCFail, err:
156       # our custom failure exception; str(err) works fine if the
157       # exception was constructed with a single argument, and in
158       # this case, err.message == err.args[0] == str(err)
159       result = (False, str(err))
160     except errors.QuitGanetiException, err:
161       # Tell parent to quit
162       logging.info("Shutting down the node daemon, arguments: %s",
163                    str(err.args))
164       os.kill(self.noded_pid, signal.SIGTERM)
165       # And return the error's arguments, which must be already in
166       # correct tuple format
167       result = err.args
168     except Exception, err:
169       logging.exception("Error in RPC call")
170       result = (False, "Error while executing backend function: %s" % str(err))
171
172     return serializer.DumpJson(result, indent=False)
173
174   # the new block devices  --------------------------
175
176   @staticmethod
177   def perspective_blockdev_create(params):
178     """Create a block device.
179
180     """
181     bdev_s, size, owner, on_primary, info = params
182     bdev = objects.Disk.FromDict(bdev_s)
183     if bdev is None:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187   @staticmethod
188   def perspective_blockdev_pause_resume_sync(params):
189     """Pause/resume sync of a block device.
190
191     """
192     disks_s, pause = params
193     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
194     return backend.BlockdevPauseResumeSync(disks, pause)
195
196   @staticmethod
197   def perspective_blockdev_wipe(params):
198     """Wipe a block device.
199
200     """
201     bdev_s, offset, size = params
202     bdev = objects.Disk.FromDict(bdev_s)
203     return backend.BlockdevWipe(bdev, offset, size)
204
205   @staticmethod
206   def perspective_blockdev_remove(params):
207     """Remove a block device.
208
209     """
210     bdev_s = params[0]
211     bdev = objects.Disk.FromDict(bdev_s)
212     return backend.BlockdevRemove(bdev)
213
214   @staticmethod
215   def perspective_blockdev_rename(params):
216     """Remove a block device.
217
218     """
219     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
220     return backend.BlockdevRename(devlist)
221
222   @staticmethod
223   def perspective_blockdev_assemble(params):
224     """Assemble a block device.
225
226     """
227     bdev_s, owner, on_primary, idx = params
228     bdev = objects.Disk.FromDict(bdev_s)
229     if bdev is None:
230       raise ValueError("can't unserialize data!")
231     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232
233   @staticmethod
234   def perspective_blockdev_shutdown(params):
235     """Shutdown a block device.
236
237     """
238     bdev_s = params[0]
239     bdev = objects.Disk.FromDict(bdev_s)
240     if bdev is None:
241       raise ValueError("can't unserialize data!")
242     return backend.BlockdevShutdown(bdev)
243
244   @staticmethod
245   def perspective_blockdev_addchildren(params):
246     """Add a child to a mirror device.
247
248     Note: this is only valid for mirror devices. It's the caller's duty
249     to send a correct disk, otherwise we raise an error.
250
251     """
252     bdev_s, ndev_s = params
253     bdev = objects.Disk.FromDict(bdev_s)
254     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255     if bdev is None or ndevs.count(None) > 0:
256       raise ValueError("can't unserialize data!")
257     return backend.BlockdevAddchildren(bdev, ndevs)
258
259   @staticmethod
260   def perspective_blockdev_removechildren(params):
261     """Remove a child from a mirror device.
262
263     This is only valid for mirror devices, of course. It's the callers
264     duty to send a correct disk, otherwise we raise an error.
265
266     """
267     bdev_s, ndev_s = params
268     bdev = objects.Disk.FromDict(bdev_s)
269     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270     if bdev is None or ndevs.count(None) > 0:
271       raise ValueError("can't unserialize data!")
272     return backend.BlockdevRemovechildren(bdev, ndevs)
273
274   @staticmethod
275   def perspective_blockdev_getmirrorstatus(params):
276     """Return the mirror status for a list of disks.
277
278     """
279     disks = [objects.Disk.FromDict(dsk_s)
280              for dsk_s in params]
281     return [status.ToDict()
282             for status in backend.BlockdevGetmirrorstatus(disks)]
283
284   @staticmethod
285   def perspective_blockdev_getmirrorstatus_multi(params):
286     """Return the mirror status for a list of disks.
287
288     """
289     (node_disks, ) = params
290
291     node_name = netutils.Hostname.GetSysName()
292
293     disks = [objects.Disk.FromDict(dsk_s)
294              for dsk_s in node_disks.get(node_name, [])]
295
296     result = []
297
298     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299       if success:
300         result.append((success, status.ToDict()))
301       else:
302         result.append((success, status))
303
304     return result
305
306   @staticmethod
307   def perspective_blockdev_find(params):
308     """Expose the FindBlockDevice functionality for a disk.
309
310     This will try to find but not activate a disk.
311
312     """
313     disk = objects.Disk.FromDict(params[0])
314
315     result = backend.BlockdevFind(disk)
316     if result is None:
317       return None
318
319     return result.ToDict()
320
321   @staticmethod
322   def perspective_blockdev_snapshot(params):
323     """Create a snapshot device.
324
325     Note that this is only valid for LVM disks, if we get passed
326     something else we raise an exception. The snapshot device can be
327     remove by calling the generic block device remove call.
328
329     """
330     cfbd = objects.Disk.FromDict(params[0])
331     return backend.BlockdevSnapshot(cfbd)
332
333   @staticmethod
334   def perspective_blockdev_grow(params):
335     """Grow a stack of devices.
336
337     """
338     cfbd = objects.Disk.FromDict(params[0])
339     amount = params[1]
340     dryrun = params[2]
341     return backend.BlockdevGrow(cfbd, amount, dryrun)
342
343   @staticmethod
344   def perspective_blockdev_close(params):
345     """Closes the given block devices.
346
347     """
348     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
349     return backend.BlockdevClose(params[0], disks)
350
351   @staticmethod
352   def perspective_blockdev_getsize(params):
353     """Compute the sizes of the given block devices.
354
355     """
356     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
357     return backend.BlockdevGetsize(disks)
358
359   @staticmethod
360   def perspective_blockdev_export(params):
361     """Compute the sizes of the given block devices.
362
363     """
364     disk = objects.Disk.FromDict(params[0])
365     dest_node, dest_path, cluster_name = params[1:]
366     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
367
368   # blockdev/drbd specific methods ----------
369
370   @staticmethod
371   def perspective_drbd_disconnect_net(params):
372     """Disconnects the network connection of drbd disks.
373
374     Note that this is only valid for drbd disks, so the members of the
375     disk list must all be drbd devices.
376
377     """
378     nodes_ip, disks = params
379     disks = [objects.Disk.FromDict(cf) for cf in disks]
380     return backend.DrbdDisconnectNet(nodes_ip, disks)
381
382   @staticmethod
383   def perspective_drbd_attach_net(params):
384     """Attaches the network connection of drbd disks.
385
386     Note that this is only valid for drbd disks, so the members of the
387     disk list must all be drbd devices.
388
389     """
390     nodes_ip, disks, instance_name, multimaster = params
391     disks = [objects.Disk.FromDict(cf) for cf in disks]
392     return backend.DrbdAttachNet(nodes_ip, disks,
393                                      instance_name, multimaster)
394
395   @staticmethod
396   def perspective_drbd_wait_sync(params):
397     """Wait until DRBD disks are synched.
398
399     Note that this is only valid for drbd disks, so the members of the
400     disk list must all be drbd devices.
401
402     """
403     nodes_ip, disks = params
404     disks = [objects.Disk.FromDict(cf) for cf in disks]
405     return backend.DrbdWaitSync(nodes_ip, disks)
406
407   @staticmethod
408   def perspective_drbd_helper(params):
409     """Query drbd helper.
410
411     """
412     return backend.GetDrbdUsermodeHelper()
413
414   # export/import  --------------------------
415
416   @staticmethod
417   def perspective_finalize_export(params):
418     """Expose the finalize export functionality.
419
420     """
421     instance = objects.Instance.FromDict(params[0])
422
423     snap_disks = []
424     for disk in params[1]:
425       if isinstance(disk, bool):
426         snap_disks.append(disk)
427       else:
428         snap_disks.append(objects.Disk.FromDict(disk))
429
430     return backend.FinalizeExport(instance, snap_disks)
431
432   @staticmethod
433   def perspective_export_info(params):
434     """Query information about an existing export on this node.
435
436     The given path may not contain an export, in which case we return
437     None.
438
439     """
440     path = params[0]
441     return backend.ExportInfo(path)
442
443   @staticmethod
444   def perspective_export_list(params):
445     """List the available exports on this node.
446
447     Note that as opposed to export_info, which may query data about an
448     export in any path, this only queries the standard Ganeti path
449     (constants.EXPORT_DIR).
450
451     """
452     return backend.ListExports()
453
454   @staticmethod
455   def perspective_export_remove(params):
456     """Remove an export.
457
458     """
459     export = params[0]
460     return backend.RemoveExport(export)
461
462   # block device ---------------------
463   @staticmethod
464   def perspective_bdev_sizes(params):
465     """Query the list of block devices
466
467     """
468     devices = params[0]
469     return backend.GetBlockDevSizes(devices)
470
471   # volume  --------------------------
472
473   @staticmethod
474   def perspective_lv_list(params):
475     """Query the list of logical volumes in a given volume group.
476
477     """
478     vgname = params[0]
479     return backend.GetVolumeList(vgname)
480
481   @staticmethod
482   def perspective_vg_list(params):
483     """Query the list of volume groups.
484
485     """
486     return backend.ListVolumeGroups()
487
488   # Storage --------------------------
489
490   @staticmethod
491   def perspective_storage_list(params):
492     """Get list of storage units.
493
494     """
495     (su_name, su_args, name, fields) = params
496     return storage.GetStorage(su_name, *su_args).List(name, fields)
497
498   @staticmethod
499   def perspective_storage_modify(params):
500     """Modify a storage unit.
501
502     """
503     (su_name, su_args, name, changes) = params
504     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
505
506   @staticmethod
507   def perspective_storage_execute(params):
508     """Execute an operation on a storage unit.
509
510     """
511     (su_name, su_args, name, op) = params
512     return storage.GetStorage(su_name, *su_args).Execute(name, op)
513
514   # bridge  --------------------------
515
516   @staticmethod
517   def perspective_bridges_exist(params):
518     """Check if all bridges given exist on this node.
519
520     """
521     bridges_list = params[0]
522     return backend.BridgesExist(bridges_list)
523
524   # instance  --------------------------
525
526   @staticmethod
527   def perspective_instance_os_add(params):
528     """Install an OS on a given instance.
529
530     """
531     inst_s = params[0]
532     inst = objects.Instance.FromDict(inst_s)
533     reinstall = params[1]
534     debug = params[2]
535     return backend.InstanceOsAdd(inst, reinstall, debug)
536
537   @staticmethod
538   def perspective_instance_run_rename(params):
539     """Runs the OS rename script for an instance.
540
541     """
542     inst_s, old_name, debug = params
543     inst = objects.Instance.FromDict(inst_s)
544     return backend.RunRenameInstance(inst, old_name, debug)
545
546   @staticmethod
547   def perspective_instance_shutdown(params):
548     """Shutdown an instance.
549
550     """
551     instance = objects.Instance.FromDict(params[0])
552     timeout = params[1]
553     return backend.InstanceShutdown(instance, timeout)
554
555   @staticmethod
556   def perspective_instance_start(params):
557     """Start an instance.
558
559     """
560     (instance_name, startup_paused) = params
561     instance = objects.Instance.FromDict(instance_name)
562     return backend.StartInstance(instance, startup_paused)
563
564   @staticmethod
565   def perspective_migration_info(params):
566     """Gather information about an instance to be migrated.
567
568     """
569     instance = objects.Instance.FromDict(params[0])
570     return backend.MigrationInfo(instance)
571
572   @staticmethod
573   def perspective_accept_instance(params):
574     """Prepare the node to accept an instance.
575
576     """
577     instance, info, target = params
578     instance = objects.Instance.FromDict(instance)
579     return backend.AcceptInstance(instance, info, target)
580
581   @staticmethod
582   def perspective_finalize_migration(params):
583     """Finalize the instance migration.
584
585     """
586     instance, info, success = params
587     instance = objects.Instance.FromDict(instance)
588     return backend.FinalizeMigration(instance, info, success)
589
590   @staticmethod
591   def perspective_instance_migrate(params):
592     """Migrates an instance.
593
594     """
595     instance, target, live = params
596     instance = objects.Instance.FromDict(instance)
597     return backend.MigrateInstance(instance, target, live)
598
599   @staticmethod
600   def perspective_instance_reboot(params):
601     """Reboot an instance.
602
603     """
604     instance = objects.Instance.FromDict(params[0])
605     reboot_type = params[1]
606     shutdown_timeout = params[2]
607     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
608
609   @staticmethod
610   def perspective_instance_info(params):
611     """Query instance information.
612
613     """
614     return backend.GetInstanceInfo(params[0], params[1])
615
616   @staticmethod
617   def perspective_instance_migratable(params):
618     """Query whether the specified instance can be migrated.
619
620     """
621     instance = objects.Instance.FromDict(params[0])
622     return backend.GetInstanceMigratable(instance)
623
624   @staticmethod
625   def perspective_all_instances_info(params):
626     """Query information about all instances.
627
628     """
629     return backend.GetAllInstancesInfo(params[0])
630
631   @staticmethod
632   def perspective_instance_list(params):
633     """Query the list of running instances.
634
635     """
636     return backend.GetInstanceList(params[0])
637
638   # node --------------------------
639
640   @staticmethod
641   def perspective_node_tcp_ping(params):
642     """Do a TcpPing on the remote node.
643
644     """
645     return netutils.TcpPing(params[1], params[2], timeout=params[3],
646                             live_port_needed=params[4], source=params[0])
647
648   @staticmethod
649   def perspective_node_has_ip_address(params):
650     """Checks if a node has the given ip address.
651
652     """
653     return netutils.IPAddress.Own(params[0])
654
655   @staticmethod
656   def perspective_node_info(params):
657     """Query node information.
658
659     """
660     vgname, hypervisor_type = params
661     return backend.GetNodeInfo(vgname, hypervisor_type)
662
663   @staticmethod
664   def perspective_etc_hosts_modify(params):
665     """Modify a node entry in /etc/hosts.
666
667     """
668     backend.EtcHostsModify(params[0], params[1], params[2])
669
670     return True
671
672   @staticmethod
673   def perspective_node_verify(params):
674     """Run a verify sequence on this node.
675
676     """
677     return backend.VerifyNode(params[0], params[1])
678
679   @staticmethod
680   def perspective_node_start_master_daemons(params):
681     """Start the master daemons on this node.
682
683     """
684     return backend.StartMasterDaemons(params[0])
685
686   @staticmethod
687   def perspective_node_activate_master_ip(params):
688     """Activate the master IP on this node.
689
690     """
691     return backend.ActivateMasterIp()
692
693   @staticmethod
694   def perspective_node_deactivate_master_ip(params):
695     """Deactivate the master IP on this node.
696
697     """
698     return backend.DeactivateMasterIp()
699
700   @staticmethod
701   def perspective_node_stop_master(params):
702     """Deactivate the master IP and stops master daemons on this node.
703
704     Sometimes both operations need to be executed at the same time (doing one of
705     the two would make impossible to do the other one).
706
707     """
708     backend.DeactivateMasterIp()
709     return backend.StopMasterDaemons()
710
711   @staticmethod
712   def perspective_node_leave_cluster(params):
713     """Cleanup after leaving a cluster.
714
715     """
716     return backend.LeaveCluster(params[0])
717
718   @staticmethod
719   def perspective_node_volumes(params):
720     """Query the list of all logical volume groups.
721
722     """
723     return backend.NodeVolumes()
724
725   @staticmethod
726   def perspective_node_demote_from_mc(params):
727     """Demote a node from the master candidate role.
728
729     """
730     return backend.DemoteFromMC()
731
732   @staticmethod
733   def perspective_node_powercycle(params):
734     """Tries to powercycle the nod.
735
736     """
737     hypervisor_type = params[0]
738     return backend.PowercycleNode(hypervisor_type)
739
740   # cluster --------------------------
741
742   @staticmethod
743   def perspective_version(params):
744     """Query version information.
745
746     """
747     return constants.PROTOCOL_VERSION
748
749   @staticmethod
750   def perspective_upload_file(params):
751     """Upload a file.
752
753     Note that the backend implementation imposes strict rules on which
754     files are accepted.
755
756     """
757     return backend.UploadFile(*params)
758
759   @staticmethod
760   def perspective_master_info(params):
761     """Query master information.
762
763     """
764     return backend.GetMasterInfo()
765
766   @staticmethod
767   def perspective_run_oob(params):
768     """Runs oob on node.
769
770     """
771     output = backend.RunOob(params[0], params[1], params[2], params[3])
772     if output:
773       result = serializer.LoadJson(output)
774     else:
775       result = None
776     return result
777
778   @staticmethod
779   def perspective_write_ssconf_files(params):
780     """Write ssconf files.
781
782     """
783     (values,) = params
784     return backend.WriteSsconfFiles(values)
785
786   # os -----------------------
787
788   @staticmethod
789   def perspective_os_diagnose(params):
790     """Query detailed information about existing OSes.
791
792     """
793     return backend.DiagnoseOS()
794
795   @staticmethod
796   def perspective_os_get(params):
797     """Query information about a given OS.
798
799     """
800     name = params[0]
801     os_obj = backend.OSFromDisk(name)
802     return os_obj.ToDict()
803
804   @staticmethod
805   def perspective_os_validate(params):
806     """Run a given OS' validation routine.
807
808     """
809     required, name, checks, params = params
810     return backend.ValidateOS(required, name, checks, params)
811
812   # hooks -----------------------
813
814   @staticmethod
815   def perspective_hooks_runner(params):
816     """Run hook scripts.
817
818     """
819     hpath, phase, env = params
820     hr = backend.HooksRunner()
821     return hr.RunHooks(hpath, phase, env)
822
823   # iallocator -----------------
824
825   @staticmethod
826   def perspective_iallocator_runner(params):
827     """Run an iallocator script.
828
829     """
830     name, idata = params
831     iar = backend.IAllocatorRunner()
832     return iar.Run(name, idata)
833
834   # test -----------------------
835
836   @staticmethod
837   def perspective_test_delay(params):
838     """Run test delay.
839
840     """
841     duration = params[0]
842     status, rval = utils.TestDelay(duration)
843     if not status:
844       raise backend.RPCFail(rval)
845     return rval
846
847   # file storage ---------------
848
849   @staticmethod
850   def perspective_file_storage_dir_create(params):
851     """Create the file storage directory.
852
853     """
854     file_storage_dir = params[0]
855     return backend.CreateFileStorageDir(file_storage_dir)
856
857   @staticmethod
858   def perspective_file_storage_dir_remove(params):
859     """Remove the file storage directory.
860
861     """
862     file_storage_dir = params[0]
863     return backend.RemoveFileStorageDir(file_storage_dir)
864
865   @staticmethod
866   def perspective_file_storage_dir_rename(params):
867     """Rename the file storage directory.
868
869     """
870     old_file_storage_dir = params[0]
871     new_file_storage_dir = params[1]
872     return backend.RenameFileStorageDir(old_file_storage_dir,
873                                         new_file_storage_dir)
874
875   # jobs ------------------------
876
877   @staticmethod
878   @_RequireJobQueueLock
879   def perspective_jobqueue_update(params):
880     """Update job queue.
881
882     """
883     (file_name, content) = params
884     return backend.JobQueueUpdate(file_name, content)
885
886   @staticmethod
887   @_RequireJobQueueLock
888   def perspective_jobqueue_purge(params):
889     """Purge job queue.
890
891     """
892     return backend.JobQueuePurge()
893
894   @staticmethod
895   @_RequireJobQueueLock
896   def perspective_jobqueue_rename(params):
897     """Rename a job queue file.
898
899     """
900     # TODO: What if a file fails to rename?
901     return [backend.JobQueueRename(old, new) for old, new in params]
902
903   # hypervisor ---------------
904
905   @staticmethod
906   def perspective_hypervisor_validate_params(params):
907     """Validate the hypervisor parameters.
908
909     """
910     (hvname, hvparams) = params
911     return backend.ValidateHVParams(hvname, hvparams)
912
913   # Crypto
914
915   @staticmethod
916   def perspective_x509_cert_create(params):
917     """Creates a new X509 certificate for SSL/TLS.
918
919     """
920     (validity, ) = params
921     return backend.CreateX509Certificate(validity)
922
923   @staticmethod
924   def perspective_x509_cert_remove(params):
925     """Removes a X509 certificate.
926
927     """
928     (name, ) = params
929     return backend.RemoveX509Certificate(name)
930
931   # Import and export
932
933   @staticmethod
934   def perspective_import_start(params):
935     """Starts an import daemon.
936
937     """
938     (opts_s, instance, component, dest, dest_args) = params
939
940     opts = objects.ImportExportOptions.FromDict(opts_s)
941
942     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
943                                            None, None,
944                                            objects.Instance.FromDict(instance),
945                                            component, dest,
946                                            _DecodeImportExportIO(dest,
947                                                                  dest_args))
948
949   @staticmethod
950   def perspective_export_start(params):
951     """Starts an export daemon.
952
953     """
954     (opts_s, host, port, instance, component, source, source_args) = params
955
956     opts = objects.ImportExportOptions.FromDict(opts_s)
957
958     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
959                                            host, port,
960                                            objects.Instance.FromDict(instance),
961                                            component, source,
962                                            _DecodeImportExportIO(source,
963                                                                  source_args))
964
965   @staticmethod
966   def perspective_impexp_status(params):
967     """Retrieves the status of an import or export daemon.
968
969     """
970     return backend.GetImportExportStatus(params[0])
971
972   @staticmethod
973   def perspective_impexp_abort(params):
974     """Aborts an import or export.
975
976     """
977     return backend.AbortImportExport(params[0])
978
979   @staticmethod
980   def perspective_impexp_cleanup(params):
981     """Cleans up after an import or export.
982
983     """
984     return backend.CleanupImportExport(params[0])
985
986
987 def CheckNoded(_, args):
988   """Initial checks whether to run or exit with a failure.
989
990   """
991   if args: # noded doesn't take any arguments
992     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
993                           sys.argv[0])
994     sys.exit(constants.EXIT_FAILURE)
995   try:
996     codecs.lookup("string-escape")
997   except LookupError:
998     print >> sys.stderr, ("Can't load the string-escape code which is part"
999                           " of the Python installation. Is your installation"
1000                           " complete/correct? Aborting.")
1001     sys.exit(constants.EXIT_FAILURE)
1002
1003
1004 def PrepNoded(options, _):
1005   """Preparation node daemon function, executed with the PID file held.
1006
1007   """
1008   if options.mlock:
1009     request_executor_class = MlockallRequestExecutor
1010     try:
1011       utils.Mlockall()
1012     except errors.NoCtypesError:
1013       logging.warning("Cannot set memory lock, ctypes module not found")
1014       request_executor_class = http.server.HttpServerRequestExecutor
1015   else:
1016     request_executor_class = http.server.HttpServerRequestExecutor
1017
1018   # Read SSL certificate
1019   if options.ssl:
1020     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1021                                     ssl_cert_path=options.ssl_cert)
1022   else:
1023     ssl_params = None
1024
1025   err = _PrepareQueueLock()
1026   if err is not None:
1027     # this might be some kind of file-system/permission error; while
1028     # this breaks the job queue functionality, we shouldn't prevent
1029     # startup of the whole node daemon because of this
1030     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1031
1032   mainloop = daemon.Mainloop()
1033   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1034                           ssl_params=ssl_params, ssl_verify_peer=True,
1035                           request_executor_class=request_executor_class)
1036   server.Start()
1037   return (mainloop, server)
1038
1039
1040 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1041   """Main node daemon function, executed with the PID file held.
1042
1043   """
1044   (mainloop, server) = prep_data
1045   try:
1046     mainloop.Run()
1047   finally:
1048     server.Stop()
1049
1050
1051 def Main():
1052   """Main function for the node daemon.
1053
1054   """
1055   parser = OptionParser(description="Ganeti node daemon",
1056                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1057                         version="%%prog (ganeti) %s" %
1058                         constants.RELEASE_VERSION)
1059   parser.add_option("--no-mlock", dest="mlock",
1060                     help="Do not mlock the node memory in ram",
1061                     default=True, action="store_false")
1062
1063   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1064                      default_ssl_cert=constants.NODED_CERT_FILE,
1065                      default_ssl_key=constants.NODED_CERT_FILE,
1066                      console_logging=True)