rpc: Change {import,export}_start to take source/dest in single argument
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     if req.request_method.upper() != http.HTTP_PUT:
142       raise http.HttpBadRequest()
143
144     path = req.request_path
145     if path.startswith("/"):
146       path = path[1:]
147
148     method = getattr(self, "perspective_%s" % path, None)
149     if method is None:
150       raise http.HttpNotFound()
151
152     try:
153       result = (True, method(serializer.LoadJson(req.request_body)))
154
155     except backend.RPCFail, err:
156       # our custom failure exception; str(err) works fine if the
157       # exception was constructed with a single argument, and in
158       # this case, err.message == err.args[0] == str(err)
159       result = (False, str(err))
160     except errors.QuitGanetiException, err:
161       # Tell parent to quit
162       logging.info("Shutting down the node daemon, arguments: %s",
163                    str(err.args))
164       os.kill(self.noded_pid, signal.SIGTERM)
165       # And return the error's arguments, which must be already in
166       # correct tuple format
167       result = err.args
168     except Exception, err:
169       logging.exception("Error in RPC call")
170       result = (False, "Error while executing backend function: %s" % str(err))
171
172     return serializer.DumpJson(result, indent=False)
173
174   # the new block devices  --------------------------
175
176   @staticmethod
177   def perspective_blockdev_create(params):
178     """Create a block device.
179
180     """
181     bdev_s, size, owner, on_primary, info = params
182     bdev = objects.Disk.FromDict(bdev_s)
183     if bdev is None:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187   @staticmethod
188   def perspective_blockdev_pause_resume_sync(params):
189     """Pause/resume sync of a block device.
190
191     """
192     disks_s, pause = params
193     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
194     return backend.BlockdevPauseResumeSync(disks, pause)
195
196   @staticmethod
197   def perspective_blockdev_wipe(params):
198     """Wipe a block device.
199
200     """
201     bdev_s, offset, size = params
202     bdev = objects.Disk.FromDict(bdev_s)
203     return backend.BlockdevWipe(bdev, offset, size)
204
205   @staticmethod
206   def perspective_blockdev_remove(params):
207     """Remove a block device.
208
209     """
210     bdev_s = params[0]
211     bdev = objects.Disk.FromDict(bdev_s)
212     return backend.BlockdevRemove(bdev)
213
214   @staticmethod
215   def perspective_blockdev_rename(params):
216     """Remove a block device.
217
218     """
219     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
220     return backend.BlockdevRename(devlist)
221
222   @staticmethod
223   def perspective_blockdev_assemble(params):
224     """Assemble a block device.
225
226     """
227     bdev_s, owner, on_primary, idx = params
228     bdev = objects.Disk.FromDict(bdev_s)
229     if bdev is None:
230       raise ValueError("can't unserialize data!")
231     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232
233   @staticmethod
234   def perspective_blockdev_shutdown(params):
235     """Shutdown a block device.
236
237     """
238     bdev_s = params[0]
239     bdev = objects.Disk.FromDict(bdev_s)
240     if bdev is None:
241       raise ValueError("can't unserialize data!")
242     return backend.BlockdevShutdown(bdev)
243
244   @staticmethod
245   def perspective_blockdev_addchildren(params):
246     """Add a child to a mirror device.
247
248     Note: this is only valid for mirror devices. It's the caller's duty
249     to send a correct disk, otherwise we raise an error.
250
251     """
252     bdev_s, ndev_s = params
253     bdev = objects.Disk.FromDict(bdev_s)
254     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255     if bdev is None or ndevs.count(None) > 0:
256       raise ValueError("can't unserialize data!")
257     return backend.BlockdevAddchildren(bdev, ndevs)
258
259   @staticmethod
260   def perspective_blockdev_removechildren(params):
261     """Remove a child from a mirror device.
262
263     This is only valid for mirror devices, of course. It's the callers
264     duty to send a correct disk, otherwise we raise an error.
265
266     """
267     bdev_s, ndev_s = params
268     bdev = objects.Disk.FromDict(bdev_s)
269     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270     if bdev is None or ndevs.count(None) > 0:
271       raise ValueError("can't unserialize data!")
272     return backend.BlockdevRemovechildren(bdev, ndevs)
273
274   @staticmethod
275   def perspective_blockdev_getmirrorstatus(params):
276     """Return the mirror status for a list of disks.
277
278     """
279     disks = [objects.Disk.FromDict(dsk_s)
280              for dsk_s in params[0]]
281     return [status.ToDict()
282             for status in backend.BlockdevGetmirrorstatus(disks)]
283
284   @staticmethod
285   def perspective_blockdev_getmirrorstatus_multi(params):
286     """Return the mirror status for a list of disks.
287
288     """
289     (node_disks, ) = params
290
291     node_name = netutils.Hostname.GetSysName()
292
293     disks = [objects.Disk.FromDict(dsk_s)
294              for dsk_s in node_disks.get(node_name, [])]
295
296     result = []
297
298     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299       if success:
300         result.append((success, status.ToDict()))
301       else:
302         result.append((success, status))
303
304     return result
305
306   @staticmethod
307   def perspective_blockdev_find(params):
308     """Expose the FindBlockDevice functionality for a disk.
309
310     This will try to find but not activate a disk.
311
312     """
313     disk = objects.Disk.FromDict(params[0])
314
315     result = backend.BlockdevFind(disk)
316     if result is None:
317       return None
318
319     return result.ToDict()
320
321   @staticmethod
322   def perspective_blockdev_snapshot(params):
323     """Create a snapshot device.
324
325     Note that this is only valid for LVM disks, if we get passed
326     something else we raise an exception. The snapshot device can be
327     remove by calling the generic block device remove call.
328
329     """
330     cfbd = objects.Disk.FromDict(params[0])
331     return backend.BlockdevSnapshot(cfbd)
332
333   @staticmethod
334   def perspective_blockdev_grow(params):
335     """Grow a stack of devices.
336
337     """
338     cfbd = objects.Disk.FromDict(params[0])
339     amount = params[1]
340     dryrun = params[2]
341     return backend.BlockdevGrow(cfbd, amount, dryrun)
342
343   @staticmethod
344   def perspective_blockdev_close(params):
345     """Closes the given block devices.
346
347     """
348     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
349     return backend.BlockdevClose(params[0], disks)
350
351   @staticmethod
352   def perspective_blockdev_getsize(params):
353     """Compute the sizes of the given block devices.
354
355     """
356     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
357     return backend.BlockdevGetsize(disks)
358
359   @staticmethod
360   def perspective_blockdev_export(params):
361     """Compute the sizes of the given block devices.
362
363     """
364     disk = objects.Disk.FromDict(params[0])
365     dest_node, dest_path, cluster_name = params[1:]
366     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
367
368   # blockdev/drbd specific methods ----------
369
370   @staticmethod
371   def perspective_drbd_disconnect_net(params):
372     """Disconnects the network connection of drbd disks.
373
374     Note that this is only valid for drbd disks, so the members of the
375     disk list must all be drbd devices.
376
377     """
378     nodes_ip, disks = params
379     disks = [objects.Disk.FromDict(cf) for cf in disks]
380     return backend.DrbdDisconnectNet(nodes_ip, disks)
381
382   @staticmethod
383   def perspective_drbd_attach_net(params):
384     """Attaches the network connection of drbd disks.
385
386     Note that this is only valid for drbd disks, so the members of the
387     disk list must all be drbd devices.
388
389     """
390     nodes_ip, disks, instance_name, multimaster = params
391     disks = [objects.Disk.FromDict(cf) for cf in disks]
392     return backend.DrbdAttachNet(nodes_ip, disks,
393                                      instance_name, multimaster)
394
395   @staticmethod
396   def perspective_drbd_wait_sync(params):
397     """Wait until DRBD disks are synched.
398
399     Note that this is only valid for drbd disks, so the members of the
400     disk list must all be drbd devices.
401
402     """
403     nodes_ip, disks = params
404     disks = [objects.Disk.FromDict(cf) for cf in disks]
405     return backend.DrbdWaitSync(nodes_ip, disks)
406
407   @staticmethod
408   def perspective_drbd_helper(params):
409     """Query drbd helper.
410
411     """
412     return backend.GetDrbdUsermodeHelper()
413
414   # export/import  --------------------------
415
416   @staticmethod
417   def perspective_finalize_export(params):
418     """Expose the finalize export functionality.
419
420     """
421     instance = objects.Instance.FromDict(params[0])
422
423     snap_disks = []
424     for disk in params[1]:
425       if isinstance(disk, bool):
426         snap_disks.append(disk)
427       else:
428         snap_disks.append(objects.Disk.FromDict(disk))
429
430     return backend.FinalizeExport(instance, snap_disks)
431
432   @staticmethod
433   def perspective_export_info(params):
434     """Query information about an existing export on this node.
435
436     The given path may not contain an export, in which case we return
437     None.
438
439     """
440     path = params[0]
441     return backend.ExportInfo(path)
442
443   @staticmethod
444   def perspective_export_list(params):
445     """List the available exports on this node.
446
447     Note that as opposed to export_info, which may query data about an
448     export in any path, this only queries the standard Ganeti path
449     (constants.EXPORT_DIR).
450
451     """
452     return backend.ListExports()
453
454   @staticmethod
455   def perspective_export_remove(params):
456     """Remove an export.
457
458     """
459     export = params[0]
460     return backend.RemoveExport(export)
461
462   # block device ---------------------
463   @staticmethod
464   def perspective_bdev_sizes(params):
465     """Query the list of block devices
466
467     """
468     devices = params[0]
469     return backend.GetBlockDevSizes(devices)
470
471   # volume  --------------------------
472
473   @staticmethod
474   def perspective_lv_list(params):
475     """Query the list of logical volumes in a given volume group.
476
477     """
478     vgname = params[0]
479     return backend.GetVolumeList(vgname)
480
481   @staticmethod
482   def perspective_vg_list(params):
483     """Query the list of volume groups.
484
485     """
486     return backend.ListVolumeGroups()
487
488   # Storage --------------------------
489
490   @staticmethod
491   def perspective_storage_list(params):
492     """Get list of storage units.
493
494     """
495     (su_name, su_args, name, fields) = params
496     return storage.GetStorage(su_name, *su_args).List(name, fields)
497
498   @staticmethod
499   def perspective_storage_modify(params):
500     """Modify a storage unit.
501
502     """
503     (su_name, su_args, name, changes) = params
504     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
505
506   @staticmethod
507   def perspective_storage_execute(params):
508     """Execute an operation on a storage unit.
509
510     """
511     (su_name, su_args, name, op) = params
512     return storage.GetStorage(su_name, *su_args).Execute(name, op)
513
514   # bridge  --------------------------
515
516   @staticmethod
517   def perspective_bridges_exist(params):
518     """Check if all bridges given exist on this node.
519
520     """
521     bridges_list = params[0]
522     return backend.BridgesExist(bridges_list)
523
524   # instance  --------------------------
525
526   @staticmethod
527   def perspective_instance_os_add(params):
528     """Install an OS on a given instance.
529
530     """
531     inst_s = params[0]
532     inst = objects.Instance.FromDict(inst_s)
533     reinstall = params[1]
534     debug = params[2]
535     return backend.InstanceOsAdd(inst, reinstall, debug)
536
537   @staticmethod
538   def perspective_instance_run_rename(params):
539     """Runs the OS rename script for an instance.
540
541     """
542     inst_s, old_name, debug = params
543     inst = objects.Instance.FromDict(inst_s)
544     return backend.RunRenameInstance(inst, old_name, debug)
545
546   @staticmethod
547   def perspective_instance_shutdown(params):
548     """Shutdown an instance.
549
550     """
551     instance = objects.Instance.FromDict(params[0])
552     timeout = params[1]
553     return backend.InstanceShutdown(instance, timeout)
554
555   @staticmethod
556   def perspective_instance_start(params):
557     """Start an instance.
558
559     """
560     (instance_name, startup_paused) = params
561     instance = objects.Instance.FromDict(instance_name)
562     return backend.StartInstance(instance, startup_paused)
563
564   @staticmethod
565   def perspective_migration_info(params):
566     """Gather information about an instance to be migrated.
567
568     """
569     instance = objects.Instance.FromDict(params[0])
570     return backend.MigrationInfo(instance)
571
572   @staticmethod
573   def perspective_accept_instance(params):
574     """Prepare the node to accept an instance.
575
576     """
577     instance, info, target = params
578     instance = objects.Instance.FromDict(instance)
579     return backend.AcceptInstance(instance, info, target)
580
581   @staticmethod
582   def perspective_instance_finalize_migration_dst(params):
583     """Finalize the instance migration on the destination node.
584
585     """
586     instance, info, success = params
587     instance = objects.Instance.FromDict(instance)
588     return backend.FinalizeMigrationDst(instance, info, success)
589
590   @staticmethod
591   def perspective_instance_migrate(params):
592     """Migrates an instance.
593
594     """
595     instance, target, live = params
596     instance = objects.Instance.FromDict(instance)
597     return backend.MigrateInstance(instance, target, live)
598
599   @staticmethod
600   def perspective_instance_finalize_migration_src(params):
601     """Finalize the instance migration on the source node.
602
603     """
604     instance, success, live = params
605     instance = objects.Instance.FromDict(instance)
606     return backend.FinalizeMigrationSource(instance, success, live)
607
608   @staticmethod
609   def perspective_instance_get_migration_status(params):
610     """Reports migration status.
611
612     """
613     instance = objects.Instance.FromDict(params[0])
614     return backend.GetMigrationStatus(instance).ToDict()
615
616   @staticmethod
617   def perspective_instance_reboot(params):
618     """Reboot an instance.
619
620     """
621     instance = objects.Instance.FromDict(params[0])
622     reboot_type = params[1]
623     shutdown_timeout = params[2]
624     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
625
626   @staticmethod
627   def perspective_instance_info(params):
628     """Query instance information.
629
630     """
631     return backend.GetInstanceInfo(params[0], params[1])
632
633   @staticmethod
634   def perspective_instance_migratable(params):
635     """Query whether the specified instance can be migrated.
636
637     """
638     instance = objects.Instance.FromDict(params[0])
639     return backend.GetInstanceMigratable(instance)
640
641   @staticmethod
642   def perspective_all_instances_info(params):
643     """Query information about all instances.
644
645     """
646     return backend.GetAllInstancesInfo(params[0])
647
648   @staticmethod
649   def perspective_instance_list(params):
650     """Query the list of running instances.
651
652     """
653     return backend.GetInstanceList(params[0])
654
655   # node --------------------------
656
657   @staticmethod
658   def perspective_node_has_ip_address(params):
659     """Checks if a node has the given ip address.
660
661     """
662     return netutils.IPAddress.Own(params[0])
663
664   @staticmethod
665   def perspective_node_info(params):
666     """Query node information.
667
668     """
669     vgname, hypervisor_type = params
670     return backend.GetNodeInfo(vgname, hypervisor_type)
671
672   @staticmethod
673   def perspective_etc_hosts_modify(params):
674     """Modify a node entry in /etc/hosts.
675
676     """
677     backend.EtcHostsModify(params[0], params[1], params[2])
678
679     return True
680
681   @staticmethod
682   def perspective_node_verify(params):
683     """Run a verify sequence on this node.
684
685     """
686     return backend.VerifyNode(params[0], params[1])
687
688   @staticmethod
689   def perspective_node_start_master_daemons(params):
690     """Start the master daemons on this node.
691
692     """
693     return backend.StartMasterDaemons(params[0])
694
695   @staticmethod
696   def perspective_node_activate_master_ip(params):
697     """Activate the master IP on this node.
698
699     """
700     return backend.ActivateMasterIp()
701
702   @staticmethod
703   def perspective_node_deactivate_master_ip(params):
704     """Deactivate the master IP on this node.
705
706     """
707     return backend.DeactivateMasterIp()
708
709   @staticmethod
710   def perspective_node_stop_master(params):
711     """Deactivate the master IP and stops master daemons on this node.
712
713     Sometimes both operations need to be executed at the same time (doing one of
714     the two would make impossible to do the other one).
715
716     """
717     backend.DeactivateMasterIp()
718     return backend.StopMasterDaemons()
719
720   @staticmethod
721   def perspective_node_change_master_netmask(params):
722     """Change the master IP netmask.
723
724     """
725     return backend.ChangeMasterNetmask(params[0])
726
727   @staticmethod
728   def perspective_node_leave_cluster(params):
729     """Cleanup after leaving a cluster.
730
731     """
732     return backend.LeaveCluster(params[0])
733
734   @staticmethod
735   def perspective_node_volumes(params):
736     """Query the list of all logical volume groups.
737
738     """
739     return backend.NodeVolumes()
740
741   @staticmethod
742   def perspective_node_demote_from_mc(params):
743     """Demote a node from the master candidate role.
744
745     """
746     return backend.DemoteFromMC()
747
748   @staticmethod
749   def perspective_node_powercycle(params):
750     """Tries to powercycle the nod.
751
752     """
753     hypervisor_type = params[0]
754     return backend.PowercycleNode(hypervisor_type)
755
756   # cluster --------------------------
757
758   @staticmethod
759   def perspective_version(params):
760     """Query version information.
761
762     """
763     return constants.PROTOCOL_VERSION
764
765   @staticmethod
766   def perspective_upload_file(params):
767     """Upload a file.
768
769     Note that the backend implementation imposes strict rules on which
770     files are accepted.
771
772     """
773     return backend.UploadFile(*(params[0]))
774
775   @staticmethod
776   def perspective_master_info(params):
777     """Query master information.
778
779     """
780     return backend.GetMasterInfo()
781
782   @staticmethod
783   def perspective_run_oob(params):
784     """Runs oob on node.
785
786     """
787     output = backend.RunOob(params[0], params[1], params[2], params[3])
788     if output:
789       result = serializer.LoadJson(output)
790     else:
791       result = None
792     return result
793
794   @staticmethod
795   def perspective_write_ssconf_files(params):
796     """Write ssconf files.
797
798     """
799     (values,) = params
800     return backend.WriteSsconfFiles(values)
801
802   # os -----------------------
803
804   @staticmethod
805   def perspective_os_diagnose(params):
806     """Query detailed information about existing OSes.
807
808     """
809     return backend.DiagnoseOS()
810
811   @staticmethod
812   def perspective_os_get(params):
813     """Query information about a given OS.
814
815     """
816     name = params[0]
817     os_obj = backend.OSFromDisk(name)
818     return os_obj.ToDict()
819
820   @staticmethod
821   def perspective_os_validate(params):
822     """Run a given OS' validation routine.
823
824     """
825     required, name, checks, params = params
826     return backend.ValidateOS(required, name, checks, params)
827
828   # hooks -----------------------
829
830   @staticmethod
831   def perspective_hooks_runner(params):
832     """Run hook scripts.
833
834     """
835     hpath, phase, env = params
836     hr = backend.HooksRunner()
837     return hr.RunHooks(hpath, phase, env)
838
839   # iallocator -----------------
840
841   @staticmethod
842   def perspective_iallocator_runner(params):
843     """Run an iallocator script.
844
845     """
846     name, idata = params
847     iar = backend.IAllocatorRunner()
848     return iar.Run(name, idata)
849
850   # test -----------------------
851
852   @staticmethod
853   def perspective_test_delay(params):
854     """Run test delay.
855
856     """
857     duration = params[0]
858     status, rval = utils.TestDelay(duration)
859     if not status:
860       raise backend.RPCFail(rval)
861     return rval
862
863   # file storage ---------------
864
865   @staticmethod
866   def perspective_file_storage_dir_create(params):
867     """Create the file storage directory.
868
869     """
870     file_storage_dir = params[0]
871     return backend.CreateFileStorageDir(file_storage_dir)
872
873   @staticmethod
874   def perspective_file_storage_dir_remove(params):
875     """Remove the file storage directory.
876
877     """
878     file_storage_dir = params[0]
879     return backend.RemoveFileStorageDir(file_storage_dir)
880
881   @staticmethod
882   def perspective_file_storage_dir_rename(params):
883     """Rename the file storage directory.
884
885     """
886     old_file_storage_dir = params[0]
887     new_file_storage_dir = params[1]
888     return backend.RenameFileStorageDir(old_file_storage_dir,
889                                         new_file_storage_dir)
890
891   # jobs ------------------------
892
893   @staticmethod
894   @_RequireJobQueueLock
895   def perspective_jobqueue_update(params):
896     """Update job queue.
897
898     """
899     (file_name, content) = params
900     return backend.JobQueueUpdate(file_name, content)
901
902   @staticmethod
903   @_RequireJobQueueLock
904   def perspective_jobqueue_purge(params):
905     """Purge job queue.
906
907     """
908     return backend.JobQueuePurge()
909
910   @staticmethod
911   @_RequireJobQueueLock
912   def perspective_jobqueue_rename(params):
913     """Rename a job queue file.
914
915     """
916     # TODO: What if a file fails to rename?
917     return [backend.JobQueueRename(old, new) for old, new in params[0]]
918
919   # hypervisor ---------------
920
921   @staticmethod
922   def perspective_hypervisor_validate_params(params):
923     """Validate the hypervisor parameters.
924
925     """
926     (hvname, hvparams) = params
927     return backend.ValidateHVParams(hvname, hvparams)
928
929   # Crypto
930
931   @staticmethod
932   def perspective_x509_cert_create(params):
933     """Creates a new X509 certificate for SSL/TLS.
934
935     """
936     (validity, ) = params
937     return backend.CreateX509Certificate(validity)
938
939   @staticmethod
940   def perspective_x509_cert_remove(params):
941     """Removes a X509 certificate.
942
943     """
944     (name, ) = params
945     return backend.RemoveX509Certificate(name)
946
947   # Import and export
948
949   @staticmethod
950   def perspective_import_start(params):
951     """Starts an import daemon.
952
953     """
954     (opts_s, instance, component, (dest, dest_args)) = params
955
956     opts = objects.ImportExportOptions.FromDict(opts_s)
957
958     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
959                                            None, None,
960                                            objects.Instance.FromDict(instance),
961                                            component, dest,
962                                            _DecodeImportExportIO(dest,
963                                                                  dest_args))
964
965   @staticmethod
966   def perspective_export_start(params):
967     """Starts an export daemon.
968
969     """
970     (opts_s, host, port, instance, component, (source, source_args)) = params
971
972     opts = objects.ImportExportOptions.FromDict(opts_s)
973
974     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
975                                            host, port,
976                                            objects.Instance.FromDict(instance),
977                                            component, source,
978                                            _DecodeImportExportIO(source,
979                                                                  source_args))
980
981   @staticmethod
982   def perspective_impexp_status(params):
983     """Retrieves the status of an import or export daemon.
984
985     """
986     return backend.GetImportExportStatus(params[0])
987
988   @staticmethod
989   def perspective_impexp_abort(params):
990     """Aborts an import or export.
991
992     """
993     return backend.AbortImportExport(params[0])
994
995   @staticmethod
996   def perspective_impexp_cleanup(params):
997     """Cleans up after an import or export.
998
999     """
1000     return backend.CleanupImportExport(params[0])
1001
1002
1003 def CheckNoded(_, args):
1004   """Initial checks whether to run or exit with a failure.
1005
1006   """
1007   if args: # noded doesn't take any arguments
1008     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1009                           sys.argv[0])
1010     sys.exit(constants.EXIT_FAILURE)
1011   try:
1012     codecs.lookup("string-escape")
1013   except LookupError:
1014     print >> sys.stderr, ("Can't load the string-escape code which is part"
1015                           " of the Python installation. Is your installation"
1016                           " complete/correct? Aborting.")
1017     sys.exit(constants.EXIT_FAILURE)
1018
1019
1020 def PrepNoded(options, _):
1021   """Preparation node daemon function, executed with the PID file held.
1022
1023   """
1024   if options.mlock:
1025     request_executor_class = MlockallRequestExecutor
1026     try:
1027       utils.Mlockall()
1028     except errors.NoCtypesError:
1029       logging.warning("Cannot set memory lock, ctypes module not found")
1030       request_executor_class = http.server.HttpServerRequestExecutor
1031   else:
1032     request_executor_class = http.server.HttpServerRequestExecutor
1033
1034   # Read SSL certificate
1035   if options.ssl:
1036     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1037                                     ssl_cert_path=options.ssl_cert)
1038   else:
1039     ssl_params = None
1040
1041   err = _PrepareQueueLock()
1042   if err is not None:
1043     # this might be some kind of file-system/permission error; while
1044     # this breaks the job queue functionality, we shouldn't prevent
1045     # startup of the whole node daemon because of this
1046     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1047
1048   mainloop = daemon.Mainloop()
1049   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1050                           ssl_params=ssl_params, ssl_verify_peer=True,
1051                           request_executor_class=request_executor_class)
1052   server.Start()
1053   return (mainloop, server)
1054
1055
1056 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1057   """Main node daemon function, executed with the PID file held.
1058
1059   """
1060   (mainloop, server) = prep_data
1061   try:
1062     mainloop.Run()
1063   finally:
1064     server.Stop()
1065
1066
1067 def Main():
1068   """Main function for the node daemon.
1069
1070   """
1071   parser = OptionParser(description="Ganeti node daemon",
1072                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1073                         version="%%prog (ganeti) %s" %
1074                         constants.RELEASE_VERSION)
1075   parser.add_option("--no-mlock", dest="mlock",
1076                     help="Do not mlock the node memory in ram",
1077                     default=True, action="store_false")
1078
1079   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1080                      default_ssl_cert=constants.NODED_CERT_FILE,
1081                      default_ssl_key=constants.NODED_CERT_FILE,
1082                      console_logging=True)