Explicitly pass params to deactivate_master_ip
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51
52 import ganeti.http.server # pylint: disable=W0611
53
54
55 queue_lock = None
56
57
58 def _PrepareQueueLock():
59   """Try to prepare the queue lock.
60
61   @return: None for success, otherwise an exception object
62
63   """
64   global queue_lock # pylint: disable=W0603
65
66   if queue_lock is not None:
67     return None
68
69   # Prepare job queue
70   try:
71     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72     return None
73   except EnvironmentError, err:
74     return err
75
76
77 def _RequireJobQueueLock(fn):
78   """Decorator for job queue manipulating functions.
79
80   """
81   QUEUE_LOCK_TIMEOUT = 10
82
83   def wrapper(*args, **kwargs):
84     # Locking in exclusive, blocking mode because there could be several
85     # children running at the same time. Waiting up to 10 seconds.
86     if _PrepareQueueLock() is not None:
87       raise errors.JobQueueError("Job queue failed initialization,"
88                                  " cannot update jobs")
89     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90     try:
91       return fn(*args, **kwargs)
92     finally:
93       queue_lock.Unlock()
94
95   return wrapper
96
97
98 def _DecodeImportExportIO(ieio, ieioargs):
99   """Decodes import/export I/O information.
100
101   """
102   if ieio == constants.IEIO_RAW_DISK:
103     assert len(ieioargs) == 1
104     return (objects.Disk.FromDict(ieioargs[0]), )
105
106   if ieio == constants.IEIO_SCRIPT:
107     assert len(ieioargs) == 2
108     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
109
110   return ieioargs
111
112
113 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
114   """Custom Request Executor class that ensures NodeHttpServer children are
115   locked in ram.
116
117   """
118   def __init__(self, *args, **kwargs):
119     utils.Mlockall()
120
121     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122
123
124 class NodeHttpServer(http.server.HttpServer):
125   """The server implementation.
126
127   This class holds all methods exposed over the RPC interface.
128
129   """
130   # too many public methods, and unused args - all methods get params
131   # due to the API
132   # pylint: disable=R0904,W0613
133   def __init__(self, *args, **kwargs):
134     http.server.HttpServer.__init__(self, *args, **kwargs)
135     self.noded_pid = os.getpid()
136
137   def HandleRequest(self, req):
138     """Handle a request.
139
140     """
141     if req.request_method.upper() != http.HTTP_PUT:
142       raise http.HttpBadRequest()
143
144     path = req.request_path
145     if path.startswith("/"):
146       path = path[1:]
147
148     method = getattr(self, "perspective_%s" % path, None)
149     if method is None:
150       raise http.HttpNotFound()
151
152     try:
153       result = (True, method(serializer.LoadJson(req.request_body)))
154
155     except backend.RPCFail, err:
156       # our custom failure exception; str(err) works fine if the
157       # exception was constructed with a single argument, and in
158       # this case, err.message == err.args[0] == str(err)
159       result = (False, str(err))
160     except errors.QuitGanetiException, err:
161       # Tell parent to quit
162       logging.info("Shutting down the node daemon, arguments: %s",
163                    str(err.args))
164       os.kill(self.noded_pid, signal.SIGTERM)
165       # And return the error's arguments, which must be already in
166       # correct tuple format
167       result = err.args
168     except Exception, err:
169       logging.exception("Error in RPC call")
170       result = (False, "Error while executing backend function: %s" % str(err))
171
172     return serializer.DumpJson(result, indent=False)
173
174   # the new block devices  --------------------------
175
176   @staticmethod
177   def perspective_blockdev_create(params):
178     """Create a block device.
179
180     """
181     bdev_s, size, owner, on_primary, info = params
182     bdev = objects.Disk.FromDict(bdev_s)
183     if bdev is None:
184       raise ValueError("can't unserialize data!")
185     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186
187   @staticmethod
188   def perspective_blockdev_pause_resume_sync(params):
189     """Pause/resume sync of a block device.
190
191     """
192     disks_s, pause = params
193     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
194     return backend.BlockdevPauseResumeSync(disks, pause)
195
196   @staticmethod
197   def perspective_blockdev_wipe(params):
198     """Wipe a block device.
199
200     """
201     bdev_s, offset, size = params
202     bdev = objects.Disk.FromDict(bdev_s)
203     return backend.BlockdevWipe(bdev, offset, size)
204
205   @staticmethod
206   def perspective_blockdev_remove(params):
207     """Remove a block device.
208
209     """
210     bdev_s = params[0]
211     bdev = objects.Disk.FromDict(bdev_s)
212     return backend.BlockdevRemove(bdev)
213
214   @staticmethod
215   def perspective_blockdev_rename(params):
216     """Remove a block device.
217
218     """
219     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
220     return backend.BlockdevRename(devlist)
221
222   @staticmethod
223   def perspective_blockdev_assemble(params):
224     """Assemble a block device.
225
226     """
227     bdev_s, owner, on_primary, idx = params
228     bdev = objects.Disk.FromDict(bdev_s)
229     if bdev is None:
230       raise ValueError("can't unserialize data!")
231     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
232
233   @staticmethod
234   def perspective_blockdev_shutdown(params):
235     """Shutdown a block device.
236
237     """
238     bdev_s = params[0]
239     bdev = objects.Disk.FromDict(bdev_s)
240     if bdev is None:
241       raise ValueError("can't unserialize data!")
242     return backend.BlockdevShutdown(bdev)
243
244   @staticmethod
245   def perspective_blockdev_addchildren(params):
246     """Add a child to a mirror device.
247
248     Note: this is only valid for mirror devices. It's the caller's duty
249     to send a correct disk, otherwise we raise an error.
250
251     """
252     bdev_s, ndev_s = params
253     bdev = objects.Disk.FromDict(bdev_s)
254     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
255     if bdev is None or ndevs.count(None) > 0:
256       raise ValueError("can't unserialize data!")
257     return backend.BlockdevAddchildren(bdev, ndevs)
258
259   @staticmethod
260   def perspective_blockdev_removechildren(params):
261     """Remove a child from a mirror device.
262
263     This is only valid for mirror devices, of course. It's the callers
264     duty to send a correct disk, otherwise we raise an error.
265
266     """
267     bdev_s, ndev_s = params
268     bdev = objects.Disk.FromDict(bdev_s)
269     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
270     if bdev is None or ndevs.count(None) > 0:
271       raise ValueError("can't unserialize data!")
272     return backend.BlockdevRemovechildren(bdev, ndevs)
273
274   @staticmethod
275   def perspective_blockdev_getmirrorstatus(params):
276     """Return the mirror status for a list of disks.
277
278     """
279     disks = [objects.Disk.FromDict(dsk_s)
280              for dsk_s in params[0]]
281     return [status.ToDict()
282             for status in backend.BlockdevGetmirrorstatus(disks)]
283
284   @staticmethod
285   def perspective_blockdev_getmirrorstatus_multi(params):
286     """Return the mirror status for a list of disks.
287
288     """
289     (node_disks, ) = params
290
291     node_name = netutils.Hostname.GetSysName()
292
293     disks = [objects.Disk.FromDict(dsk_s)
294              for dsk_s in node_disks.get(node_name, [])]
295
296     result = []
297
298     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299       if success:
300         result.append((success, status.ToDict()))
301       else:
302         result.append((success, status))
303
304     return result
305
306   @staticmethod
307   def perspective_blockdev_find(params):
308     """Expose the FindBlockDevice functionality for a disk.
309
310     This will try to find but not activate a disk.
311
312     """
313     disk = objects.Disk.FromDict(params[0])
314
315     result = backend.BlockdevFind(disk)
316     if result is None:
317       return None
318
319     return result.ToDict()
320
321   @staticmethod
322   def perspective_blockdev_snapshot(params):
323     """Create a snapshot device.
324
325     Note that this is only valid for LVM disks, if we get passed
326     something else we raise an exception. The snapshot device can be
327     remove by calling the generic block device remove call.
328
329     """
330     cfbd = objects.Disk.FromDict(params[0])
331     return backend.BlockdevSnapshot(cfbd)
332
333   @staticmethod
334   def perspective_blockdev_grow(params):
335     """Grow a stack of devices.
336
337     """
338     cfbd = objects.Disk.FromDict(params[0])
339     amount = params[1]
340     dryrun = params[2]
341     return backend.BlockdevGrow(cfbd, amount, dryrun)
342
343   @staticmethod
344   def perspective_blockdev_close(params):
345     """Closes the given block devices.
346
347     """
348     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
349     return backend.BlockdevClose(params[0], disks)
350
351   @staticmethod
352   def perspective_blockdev_getsize(params):
353     """Compute the sizes of the given block devices.
354
355     """
356     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
357     return backend.BlockdevGetsize(disks)
358
359   @staticmethod
360   def perspective_blockdev_export(params):
361     """Compute the sizes of the given block devices.
362
363     """
364     disk = objects.Disk.FromDict(params[0])
365     dest_node, dest_path, cluster_name = params[1:]
366     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
367
368   # blockdev/drbd specific methods ----------
369
370   @staticmethod
371   def perspective_drbd_disconnect_net(params):
372     """Disconnects the network connection of drbd disks.
373
374     Note that this is only valid for drbd disks, so the members of the
375     disk list must all be drbd devices.
376
377     """
378     nodes_ip, disks = params
379     disks = [objects.Disk.FromDict(cf) for cf in disks]
380     return backend.DrbdDisconnectNet(nodes_ip, disks)
381
382   @staticmethod
383   def perspective_drbd_attach_net(params):
384     """Attaches the network connection of drbd disks.
385
386     Note that this is only valid for drbd disks, so the members of the
387     disk list must all be drbd devices.
388
389     """
390     nodes_ip, disks, instance_name, multimaster = params
391     disks = [objects.Disk.FromDict(cf) for cf in disks]
392     return backend.DrbdAttachNet(nodes_ip, disks,
393                                      instance_name, multimaster)
394
395   @staticmethod
396   def perspective_drbd_wait_sync(params):
397     """Wait until DRBD disks are synched.
398
399     Note that this is only valid for drbd disks, so the members of the
400     disk list must all be drbd devices.
401
402     """
403     nodes_ip, disks = params
404     disks = [objects.Disk.FromDict(cf) for cf in disks]
405     return backend.DrbdWaitSync(nodes_ip, disks)
406
407   @staticmethod
408   def perspective_drbd_helper(params):
409     """Query drbd helper.
410
411     """
412     return backend.GetDrbdUsermodeHelper()
413
414   # export/import  --------------------------
415
416   @staticmethod
417   def perspective_finalize_export(params):
418     """Expose the finalize export functionality.
419
420     """
421     instance = objects.Instance.FromDict(params[0])
422
423     snap_disks = []
424     for disk in params[1]:
425       if isinstance(disk, bool):
426         snap_disks.append(disk)
427       else:
428         snap_disks.append(objects.Disk.FromDict(disk))
429
430     return backend.FinalizeExport(instance, snap_disks)
431
432   @staticmethod
433   def perspective_export_info(params):
434     """Query information about an existing export on this node.
435
436     The given path may not contain an export, in which case we return
437     None.
438
439     """
440     path = params[0]
441     return backend.ExportInfo(path)
442
443   @staticmethod
444   def perspective_export_list(params):
445     """List the available exports on this node.
446
447     Note that as opposed to export_info, which may query data about an
448     export in any path, this only queries the standard Ganeti path
449     (constants.EXPORT_DIR).
450
451     """
452     return backend.ListExports()
453
454   @staticmethod
455   def perspective_export_remove(params):
456     """Remove an export.
457
458     """
459     export = params[0]
460     return backend.RemoveExport(export)
461
462   # block device ---------------------
463   @staticmethod
464   def perspective_bdev_sizes(params):
465     """Query the list of block devices
466
467     """
468     devices = params[0]
469     return backend.GetBlockDevSizes(devices)
470
471   # volume  --------------------------
472
473   @staticmethod
474   def perspective_lv_list(params):
475     """Query the list of logical volumes in a given volume group.
476
477     """
478     vgname = params[0]
479     return backend.GetVolumeList(vgname)
480
481   @staticmethod
482   def perspective_vg_list(params):
483     """Query the list of volume groups.
484
485     """
486     return backend.ListVolumeGroups()
487
488   # Storage --------------------------
489
490   @staticmethod
491   def perspective_storage_list(params):
492     """Get list of storage units.
493
494     """
495     (su_name, su_args, name, fields) = params
496     return storage.GetStorage(su_name, *su_args).List(name, fields)
497
498   @staticmethod
499   def perspective_storage_modify(params):
500     """Modify a storage unit.
501
502     """
503     (su_name, su_args, name, changes) = params
504     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
505
506   @staticmethod
507   def perspective_storage_execute(params):
508     """Execute an operation on a storage unit.
509
510     """
511     (su_name, su_args, name, op) = params
512     return storage.GetStorage(su_name, *su_args).Execute(name, op)
513
514   # bridge  --------------------------
515
516   @staticmethod
517   def perspective_bridges_exist(params):
518     """Check if all bridges given exist on this node.
519
520     """
521     bridges_list = params[0]
522     return backend.BridgesExist(bridges_list)
523
524   # instance  --------------------------
525
526   @staticmethod
527   def perspective_instance_os_add(params):
528     """Install an OS on a given instance.
529
530     """
531     inst_s = params[0]
532     inst = objects.Instance.FromDict(inst_s)
533     reinstall = params[1]
534     debug = params[2]
535     return backend.InstanceOsAdd(inst, reinstall, debug)
536
537   @staticmethod
538   def perspective_instance_run_rename(params):
539     """Runs the OS rename script for an instance.
540
541     """
542     inst_s, old_name, debug = params
543     inst = objects.Instance.FromDict(inst_s)
544     return backend.RunRenameInstance(inst, old_name, debug)
545
546   @staticmethod
547   def perspective_instance_shutdown(params):
548     """Shutdown an instance.
549
550     """
551     instance = objects.Instance.FromDict(params[0])
552     timeout = params[1]
553     return backend.InstanceShutdown(instance, timeout)
554
555   @staticmethod
556   def perspective_instance_start(params):
557     """Start an instance.
558
559     """
560     (instance_name, startup_paused) = params
561     instance = objects.Instance.FromDict(instance_name)
562     return backend.StartInstance(instance, startup_paused)
563
564   @staticmethod
565   def perspective_migration_info(params):
566     """Gather information about an instance to be migrated.
567
568     """
569     instance = objects.Instance.FromDict(params[0])
570     return backend.MigrationInfo(instance)
571
572   @staticmethod
573   def perspective_accept_instance(params):
574     """Prepare the node to accept an instance.
575
576     """
577     instance, info, target = params
578     instance = objects.Instance.FromDict(instance)
579     return backend.AcceptInstance(instance, info, target)
580
581   @staticmethod
582   def perspective_instance_finalize_migration_dst(params):
583     """Finalize the instance migration on the destination node.
584
585     """
586     instance, info, success = params
587     instance = objects.Instance.FromDict(instance)
588     return backend.FinalizeMigrationDst(instance, info, success)
589
590   @staticmethod
591   def perspective_instance_migrate(params):
592     """Migrates an instance.
593
594     """
595     instance, target, live = params
596     instance = objects.Instance.FromDict(instance)
597     return backend.MigrateInstance(instance, target, live)
598
599   @staticmethod
600   def perspective_instance_finalize_migration_src(params):
601     """Finalize the instance migration on the source node.
602
603     """
604     instance, success, live = params
605     instance = objects.Instance.FromDict(instance)
606     return backend.FinalizeMigrationSource(instance, success, live)
607
608   @staticmethod
609   def perspective_instance_get_migration_status(params):
610     """Reports migration status.
611
612     """
613     instance = objects.Instance.FromDict(params[0])
614     return backend.GetMigrationStatus(instance).ToDict()
615
616   @staticmethod
617   def perspective_instance_reboot(params):
618     """Reboot an instance.
619
620     """
621     instance = objects.Instance.FromDict(params[0])
622     reboot_type = params[1]
623     shutdown_timeout = params[2]
624     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
625
626   @staticmethod
627   def perspective_instance_info(params):
628     """Query instance information.
629
630     """
631     return backend.GetInstanceInfo(params[0], params[1])
632
633   @staticmethod
634   def perspective_instance_migratable(params):
635     """Query whether the specified instance can be migrated.
636
637     """
638     instance = objects.Instance.FromDict(params[0])
639     return backend.GetInstanceMigratable(instance)
640
641   @staticmethod
642   def perspective_all_instances_info(params):
643     """Query information about all instances.
644
645     """
646     return backend.GetAllInstancesInfo(params[0])
647
648   @staticmethod
649   def perspective_instance_list(params):
650     """Query the list of running instances.
651
652     """
653     return backend.GetInstanceList(params[0])
654
655   # node --------------------------
656
657   @staticmethod
658   def perspective_node_has_ip_address(params):
659     """Checks if a node has the given ip address.
660
661     """
662     return netutils.IPAddress.Own(params[0])
663
664   @staticmethod
665   def perspective_node_info(params):
666     """Query node information.
667
668     """
669     vgname, hypervisor_type = params
670     return backend.GetNodeInfo(vgname, hypervisor_type)
671
672   @staticmethod
673   def perspective_etc_hosts_modify(params):
674     """Modify a node entry in /etc/hosts.
675
676     """
677     backend.EtcHostsModify(params[0], params[1], params[2])
678
679     return True
680
681   @staticmethod
682   def perspective_node_verify(params):
683     """Run a verify sequence on this node.
684
685     """
686     return backend.VerifyNode(params[0], params[1])
687
688   @staticmethod
689   def perspective_node_start_master_daemons(params):
690     """Start the master daemons on this node.
691
692     """
693     return backend.StartMasterDaemons(params[0])
694
695   @staticmethod
696   def perspective_node_activate_master_ip(params):
697     """Activate the master IP on this node.
698
699     """
700     return backend.ActivateMasterIp(params[0], params[1], params[2], params[3])
701
702   @staticmethod
703   def perspective_node_deactivate_master_ip(params):
704     """Deactivate the master IP on this node.
705
706     """
707     return backend.DeactivateMasterIp(params[0], params[1], params[2])
708
709   @staticmethod
710   def perspective_node_stop_master(params):
711     """Stops master daemons on this node.
712
713     """
714     return backend.StopMasterDaemons()
715
716   @staticmethod
717   def perspective_node_change_master_netmask(params):
718     """Change the master IP netmask.
719
720     """
721     return backend.ChangeMasterNetmask(params[0])
722
723   @staticmethod
724   def perspective_node_leave_cluster(params):
725     """Cleanup after leaving a cluster.
726
727     """
728     return backend.LeaveCluster(params[0])
729
730   @staticmethod
731   def perspective_node_volumes(params):
732     """Query the list of all logical volume groups.
733
734     """
735     return backend.NodeVolumes()
736
737   @staticmethod
738   def perspective_node_demote_from_mc(params):
739     """Demote a node from the master candidate role.
740
741     """
742     return backend.DemoteFromMC()
743
744   @staticmethod
745   def perspective_node_powercycle(params):
746     """Tries to powercycle the nod.
747
748     """
749     hypervisor_type = params[0]
750     return backend.PowercycleNode(hypervisor_type)
751
752   # cluster --------------------------
753
754   @staticmethod
755   def perspective_version(params):
756     """Query version information.
757
758     """
759     return constants.PROTOCOL_VERSION
760
761   @staticmethod
762   def perspective_upload_file(params):
763     """Upload a file.
764
765     Note that the backend implementation imposes strict rules on which
766     files are accepted.
767
768     """
769     return backend.UploadFile(*(params[0]))
770
771   @staticmethod
772   def perspective_master_info(params):
773     """Query master information.
774
775     """
776     return backend.GetMasterInfo()
777
778   @staticmethod
779   def perspective_run_oob(params):
780     """Runs oob on node.
781
782     """
783     output = backend.RunOob(params[0], params[1], params[2], params[3])
784     if output:
785       result = serializer.LoadJson(output)
786     else:
787       result = None
788     return result
789
790   @staticmethod
791   def perspective_write_ssconf_files(params):
792     """Write ssconf files.
793
794     """
795     (values,) = params
796     return backend.WriteSsconfFiles(values)
797
798   # os -----------------------
799
800   @staticmethod
801   def perspective_os_diagnose(params):
802     """Query detailed information about existing OSes.
803
804     """
805     return backend.DiagnoseOS()
806
807   @staticmethod
808   def perspective_os_get(params):
809     """Query information about a given OS.
810
811     """
812     name = params[0]
813     os_obj = backend.OSFromDisk(name)
814     return os_obj.ToDict()
815
816   @staticmethod
817   def perspective_os_validate(params):
818     """Run a given OS' validation routine.
819
820     """
821     required, name, checks, params = params
822     return backend.ValidateOS(required, name, checks, params)
823
824   # hooks -----------------------
825
826   @staticmethod
827   def perspective_hooks_runner(params):
828     """Run hook scripts.
829
830     """
831     hpath, phase, env = params
832     hr = backend.HooksRunner()
833     return hr.RunHooks(hpath, phase, env)
834
835   # iallocator -----------------
836
837   @staticmethod
838   def perspective_iallocator_runner(params):
839     """Run an iallocator script.
840
841     """
842     name, idata = params
843     iar = backend.IAllocatorRunner()
844     return iar.Run(name, idata)
845
846   # test -----------------------
847
848   @staticmethod
849   def perspective_test_delay(params):
850     """Run test delay.
851
852     """
853     duration = params[0]
854     status, rval = utils.TestDelay(duration)
855     if not status:
856       raise backend.RPCFail(rval)
857     return rval
858
859   # file storage ---------------
860
861   @staticmethod
862   def perspective_file_storage_dir_create(params):
863     """Create the file storage directory.
864
865     """
866     file_storage_dir = params[0]
867     return backend.CreateFileStorageDir(file_storage_dir)
868
869   @staticmethod
870   def perspective_file_storage_dir_remove(params):
871     """Remove the file storage directory.
872
873     """
874     file_storage_dir = params[0]
875     return backend.RemoveFileStorageDir(file_storage_dir)
876
877   @staticmethod
878   def perspective_file_storage_dir_rename(params):
879     """Rename the file storage directory.
880
881     """
882     old_file_storage_dir = params[0]
883     new_file_storage_dir = params[1]
884     return backend.RenameFileStorageDir(old_file_storage_dir,
885                                         new_file_storage_dir)
886
887   # jobs ------------------------
888
889   @staticmethod
890   @_RequireJobQueueLock
891   def perspective_jobqueue_update(params):
892     """Update job queue.
893
894     """
895     (file_name, content) = params
896     return backend.JobQueueUpdate(file_name, content)
897
898   @staticmethod
899   @_RequireJobQueueLock
900   def perspective_jobqueue_purge(params):
901     """Purge job queue.
902
903     """
904     return backend.JobQueuePurge()
905
906   @staticmethod
907   @_RequireJobQueueLock
908   def perspective_jobqueue_rename(params):
909     """Rename a job queue file.
910
911     """
912     # TODO: What if a file fails to rename?
913     return [backend.JobQueueRename(old, new) for old, new in params[0]]
914
915   # hypervisor ---------------
916
917   @staticmethod
918   def perspective_hypervisor_validate_params(params):
919     """Validate the hypervisor parameters.
920
921     """
922     (hvname, hvparams) = params
923     return backend.ValidateHVParams(hvname, hvparams)
924
925   # Crypto
926
927   @staticmethod
928   def perspective_x509_cert_create(params):
929     """Creates a new X509 certificate for SSL/TLS.
930
931     """
932     (validity, ) = params
933     return backend.CreateX509Certificate(validity)
934
935   @staticmethod
936   def perspective_x509_cert_remove(params):
937     """Removes a X509 certificate.
938
939     """
940     (name, ) = params
941     return backend.RemoveX509Certificate(name)
942
943   # Import and export
944
945   @staticmethod
946   def perspective_import_start(params):
947     """Starts an import daemon.
948
949     """
950     (opts_s, instance, component, (dest, dest_args)) = params
951
952     opts = objects.ImportExportOptions.FromDict(opts_s)
953
954     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
955                                            None, None,
956                                            objects.Instance.FromDict(instance),
957                                            component, dest,
958                                            _DecodeImportExportIO(dest,
959                                                                  dest_args))
960
961   @staticmethod
962   def perspective_export_start(params):
963     """Starts an export daemon.
964
965     """
966     (opts_s, host, port, instance, component, (source, source_args)) = params
967
968     opts = objects.ImportExportOptions.FromDict(opts_s)
969
970     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
971                                            host, port,
972                                            objects.Instance.FromDict(instance),
973                                            component, source,
974                                            _DecodeImportExportIO(source,
975                                                                  source_args))
976
977   @staticmethod
978   def perspective_impexp_status(params):
979     """Retrieves the status of an import or export daemon.
980
981     """
982     return backend.GetImportExportStatus(params[0])
983
984   @staticmethod
985   def perspective_impexp_abort(params):
986     """Aborts an import or export.
987
988     """
989     return backend.AbortImportExport(params[0])
990
991   @staticmethod
992   def perspective_impexp_cleanup(params):
993     """Cleans up after an import or export.
994
995     """
996     return backend.CleanupImportExport(params[0])
997
998
999 def CheckNoded(_, args):
1000   """Initial checks whether to run or exit with a failure.
1001
1002   """
1003   if args: # noded doesn't take any arguments
1004     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1005                           sys.argv[0])
1006     sys.exit(constants.EXIT_FAILURE)
1007   try:
1008     codecs.lookup("string-escape")
1009   except LookupError:
1010     print >> sys.stderr, ("Can't load the string-escape code which is part"
1011                           " of the Python installation. Is your installation"
1012                           " complete/correct? Aborting.")
1013     sys.exit(constants.EXIT_FAILURE)
1014
1015
1016 def PrepNoded(options, _):
1017   """Preparation node daemon function, executed with the PID file held.
1018
1019   """
1020   if options.mlock:
1021     request_executor_class = MlockallRequestExecutor
1022     try:
1023       utils.Mlockall()
1024     except errors.NoCtypesError:
1025       logging.warning("Cannot set memory lock, ctypes module not found")
1026       request_executor_class = http.server.HttpServerRequestExecutor
1027   else:
1028     request_executor_class = http.server.HttpServerRequestExecutor
1029
1030   # Read SSL certificate
1031   if options.ssl:
1032     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1033                                     ssl_cert_path=options.ssl_cert)
1034   else:
1035     ssl_params = None
1036
1037   err = _PrepareQueueLock()
1038   if err is not None:
1039     # this might be some kind of file-system/permission error; while
1040     # this breaks the job queue functionality, we shouldn't prevent
1041     # startup of the whole node daemon because of this
1042     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1043
1044   mainloop = daemon.Mainloop()
1045   server = NodeHttpServer(mainloop, options.bind_address, options.port,
1046                           ssl_params=ssl_params, ssl_verify_peer=True,
1047                           request_executor_class=request_executor_class)
1048   server.Start()
1049   return (mainloop, server)
1050
1051
1052 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1053   """Main node daemon function, executed with the PID file held.
1054
1055   """
1056   (mainloop, server) = prep_data
1057   try:
1058     mainloop.Run()
1059   finally:
1060     server.Stop()
1061
1062
1063 def Main():
1064   """Main function for the node daemon.
1065
1066   """
1067   parser = OptionParser(description="Ganeti node daemon",
1068                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1069                         version="%%prog (ganeti) %s" %
1070                         constants.RELEASE_VERSION)
1071   parser.add_option("--no-mlock", dest="mlock",
1072                     help="Do not mlock the node memory in ram",
1073                     default=True, action="store_false")
1074
1075   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1076                      default_ssl_cert=constants.NODED_CERT_FILE,
1077                      default_ssl_key=constants.NODED_CERT_FILE,
1078                      console_logging=True)