Replicate queue drain flag across all master candidates
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _PrepareQueueLock():
61   """Try to prepare the queue lock.
62
63   @return: None for success, otherwise an exception object
64
65   """
66   global queue_lock # pylint: disable=W0603
67
68   if queue_lock is not None:
69     return None
70
71   # Prepare job queue
72   try:
73     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
74     return None
75   except EnvironmentError, err:
76     return err
77
78
79 def _RequireJobQueueLock(fn):
80   """Decorator for job queue manipulating functions.
81
82   """
83   QUEUE_LOCK_TIMEOUT = 10
84
85   def wrapper(*args, **kwargs):
86     # Locking in exclusive, blocking mode because there could be several
87     # children running at the same time. Waiting up to 10 seconds.
88     if _PrepareQueueLock() is not None:
89       raise errors.JobQueueError("Job queue failed initialization,"
90                                  " cannot update jobs")
91     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
92     try:
93       return fn(*args, **kwargs)
94     finally:
95       queue_lock.Unlock()
96
97   return wrapper
98
99
100 def _DecodeImportExportIO(ieio, ieioargs):
101   """Decodes import/export I/O information.
102
103   """
104   if ieio == constants.IEIO_RAW_DISK:
105     assert len(ieioargs) == 1
106     return (objects.Disk.FromDict(ieioargs[0]), )
107
108   if ieio == constants.IEIO_SCRIPT:
109     assert len(ieioargs) == 2
110     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
111
112   return ieioargs
113
114
115 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
116   """Subclass ensuring request handlers are locked in RAM.
117
118   """
119   def __init__(self, *args, **kwargs):
120     utils.Mlockall()
121
122     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
123
124
125 class NodeRequestHandler(http.server.HttpServerHandler):
126   """The server implementation.
127
128   This class holds all methods exposed over the RPC interface.
129
130   """
131   # too many public methods, and unused args - all methods get params
132   # due to the API
133   # pylint: disable=R0904,W0613
134   def __init__(self):
135     http.server.HttpServerHandler.__init__(self)
136     self.noded_pid = os.getpid()
137
138   def HandleRequest(self, req):
139     """Handle a request.
140
141     """
142     if req.request_method.upper() != http.HTTP_POST:
143       raise http.HttpBadRequest("Only the POST method is supported")
144
145     path = req.request_path
146     if path.startswith("/"):
147       path = path[1:]
148
149     method = getattr(self, "perspective_%s" % path, None)
150     if method is None:
151       raise http.HttpNotFound()
152
153     try:
154       result = (True, method(serializer.LoadJson(req.request_body)))
155
156     except backend.RPCFail, err:
157       # our custom failure exception; str(err) works fine if the
158       # exception was constructed with a single argument, and in
159       # this case, err.message == err.args[0] == str(err)
160       result = (False, str(err))
161     except errors.QuitGanetiException, err:
162       # Tell parent to quit
163       logging.info("Shutting down the node daemon, arguments: %s",
164                    str(err.args))
165       os.kill(self.noded_pid, signal.SIGTERM)
166       # And return the error's arguments, which must be already in
167       # correct tuple format
168       result = err.args
169     except Exception, err:
170       logging.exception("Error in RPC call")
171       result = (False, "Error while executing backend function: %s" % str(err))
172
173     return serializer.DumpJson(result)
174
175   # the new block devices  --------------------------
176
177   @staticmethod
178   def perspective_blockdev_create(params):
179     """Create a block device.
180
181     """
182     bdev_s, size, owner, on_primary, info = params
183     bdev = objects.Disk.FromDict(bdev_s)
184     if bdev is None:
185       raise ValueError("can't unserialize data!")
186     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187
188   @staticmethod
189   def perspective_blockdev_pause_resume_sync(params):
190     """Pause/resume sync of a block device.
191
192     """
193     disks_s, pause = params
194     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
195     return backend.BlockdevPauseResumeSync(disks, pause)
196
197   @staticmethod
198   def perspective_blockdev_wipe(params):
199     """Wipe a block device.
200
201     """
202     bdev_s, offset, size = params
203     bdev = objects.Disk.FromDict(bdev_s)
204     return backend.BlockdevWipe(bdev, offset, size)
205
206   @staticmethod
207   def perspective_blockdev_remove(params):
208     """Remove a block device.
209
210     """
211     bdev_s = params[0]
212     bdev = objects.Disk.FromDict(bdev_s)
213     return backend.BlockdevRemove(bdev)
214
215   @staticmethod
216   def perspective_blockdev_rename(params):
217     """Remove a block device.
218
219     """
220     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
221     return backend.BlockdevRename(devlist)
222
223   @staticmethod
224   def perspective_blockdev_assemble(params):
225     """Assemble a block device.
226
227     """
228     bdev_s, owner, on_primary, idx = params
229     bdev = objects.Disk.FromDict(bdev_s)
230     if bdev is None:
231       raise ValueError("can't unserialize data!")
232     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
233
234   @staticmethod
235   def perspective_blockdev_shutdown(params):
236     """Shutdown a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     if bdev is None:
242       raise ValueError("can't unserialize data!")
243     return backend.BlockdevShutdown(bdev)
244
245   @staticmethod
246   def perspective_blockdev_addchildren(params):
247     """Add a child to a mirror device.
248
249     Note: this is only valid for mirror devices. It's the caller's duty
250     to send a correct disk, otherwise we raise an error.
251
252     """
253     bdev_s, ndev_s = params
254     bdev = objects.Disk.FromDict(bdev_s)
255     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
256     if bdev is None or ndevs.count(None) > 0:
257       raise ValueError("can't unserialize data!")
258     return backend.BlockdevAddchildren(bdev, ndevs)
259
260   @staticmethod
261   def perspective_blockdev_removechildren(params):
262     """Remove a child from a mirror device.
263
264     This is only valid for mirror devices, of course. It's the callers
265     duty to send a correct disk, otherwise we raise an error.
266
267     """
268     bdev_s, ndev_s = params
269     bdev = objects.Disk.FromDict(bdev_s)
270     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
271     if bdev is None or ndevs.count(None) > 0:
272       raise ValueError("can't unserialize data!")
273     return backend.BlockdevRemovechildren(bdev, ndevs)
274
275   @staticmethod
276   def perspective_blockdev_getmirrorstatus(params):
277     """Return the mirror status for a list of disks.
278
279     """
280     disks = [objects.Disk.FromDict(dsk_s)
281              for dsk_s in params[0]]
282     return [status.ToDict()
283             for status in backend.BlockdevGetmirrorstatus(disks)]
284
285   @staticmethod
286   def perspective_blockdev_getmirrorstatus_multi(params):
287     """Return the mirror status for a list of disks.
288
289     """
290     (node_disks, ) = params
291
292     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
293
294     result = []
295
296     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
297       if success:
298         result.append((success, status.ToDict()))
299       else:
300         result.append((success, status))
301
302     return result
303
304   @staticmethod
305   def perspective_blockdev_find(params):
306     """Expose the FindBlockDevice functionality for a disk.
307
308     This will try to find but not activate a disk.
309
310     """
311     disk = objects.Disk.FromDict(params[0])
312
313     result = backend.BlockdevFind(disk)
314     if result is None:
315       return None
316
317     return result.ToDict()
318
319   @staticmethod
320   def perspective_blockdev_snapshot(params):
321     """Create a snapshot device.
322
323     Note that this is only valid for LVM disks, if we get passed
324     something else we raise an exception. The snapshot device can be
325     remove by calling the generic block device remove call.
326
327     """
328     cfbd = objects.Disk.FromDict(params[0])
329     return backend.BlockdevSnapshot(cfbd)
330
331   @staticmethod
332   def perspective_blockdev_grow(params):
333     """Grow a stack of devices.
334
335     """
336     if len(params) < 4:
337       raise ValueError("Received only 3 parameters in blockdev_grow,"
338                        " old master?")
339     cfbd = objects.Disk.FromDict(params[0])
340     amount = params[1]
341     dryrun = params[2]
342     backingstore = params[3]
343     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
344
345   @staticmethod
346   def perspective_blockdev_close(params):
347     """Closes the given block devices.
348
349     """
350     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
351     return backend.BlockdevClose(params[0], disks)
352
353   @staticmethod
354   def perspective_blockdev_getsize(params):
355     """Compute the sizes of the given block devices.
356
357     """
358     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
359     return backend.BlockdevGetsize(disks)
360
361   @staticmethod
362   def perspective_blockdev_export(params):
363     """Compute the sizes of the given block devices.
364
365     """
366     disk = objects.Disk.FromDict(params[0])
367     dest_node, dest_path, cluster_name = params[1:]
368     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
369
370   @staticmethod
371   def perspective_blockdev_setinfo(params):
372     """Sets metadata information on the given block device.
373
374     """
375     (disk, info) = params
376     disk = objects.Disk.FromDict(disk)
377     return backend.BlockdevSetInfo(disk, info)
378
379   # blockdev/drbd specific methods ----------
380
381   @staticmethod
382   def perspective_drbd_disconnect_net(params):
383     """Disconnects the network connection of drbd disks.
384
385     Note that this is only valid for drbd disks, so the members of the
386     disk list must all be drbd devices.
387
388     """
389     nodes_ip, disks = params
390     disks = [objects.Disk.FromDict(cf) for cf in disks]
391     return backend.DrbdDisconnectNet(nodes_ip, disks)
392
393   @staticmethod
394   def perspective_drbd_attach_net(params):
395     """Attaches the network connection of drbd disks.
396
397     Note that this is only valid for drbd disks, so the members of the
398     disk list must all be drbd devices.
399
400     """
401     nodes_ip, disks, instance_name, multimaster = params
402     disks = [objects.Disk.FromDict(cf) for cf in disks]
403     return backend.DrbdAttachNet(nodes_ip, disks,
404                                      instance_name, multimaster)
405
406   @staticmethod
407   def perspective_drbd_wait_sync(params):
408     """Wait until DRBD disks are synched.
409
410     Note that this is only valid for drbd disks, so the members of the
411     disk list must all be drbd devices.
412
413     """
414     nodes_ip, disks = params
415     disks = [objects.Disk.FromDict(cf) for cf in disks]
416     return backend.DrbdWaitSync(nodes_ip, disks)
417
418   @staticmethod
419   def perspective_drbd_helper(params):
420     """Query drbd helper.
421
422     """
423     return backend.GetDrbdUsermodeHelper()
424
425   # export/import  --------------------------
426
427   @staticmethod
428   def perspective_finalize_export(params):
429     """Expose the finalize export functionality.
430
431     """
432     instance = objects.Instance.FromDict(params[0])
433
434     snap_disks = []
435     for disk in params[1]:
436       if isinstance(disk, bool):
437         snap_disks.append(disk)
438       else:
439         snap_disks.append(objects.Disk.FromDict(disk))
440
441     return backend.FinalizeExport(instance, snap_disks)
442
443   @staticmethod
444   def perspective_export_info(params):
445     """Query information about an existing export on this node.
446
447     The given path may not contain an export, in which case we return
448     None.
449
450     """
451     path = params[0]
452     return backend.ExportInfo(path)
453
454   @staticmethod
455   def perspective_export_list(params):
456     """List the available exports on this node.
457
458     Note that as opposed to export_info, which may query data about an
459     export in any path, this only queries the standard Ganeti path
460     (pathutils.EXPORT_DIR).
461
462     """
463     return backend.ListExports()
464
465   @staticmethod
466   def perspective_export_remove(params):
467     """Remove an export.
468
469     """
470     export = params[0]
471     return backend.RemoveExport(export)
472
473   # block device ---------------------
474   @staticmethod
475   def perspective_bdev_sizes(params):
476     """Query the list of block devices
477
478     """
479     devices = params[0]
480     return backend.GetBlockDevSizes(devices)
481
482   # volume  --------------------------
483
484   @staticmethod
485   def perspective_lv_list(params):
486     """Query the list of logical volumes in a given volume group.
487
488     """
489     vgname = params[0]
490     return backend.GetVolumeList(vgname)
491
492   @staticmethod
493   def perspective_vg_list(params):
494     """Query the list of volume groups.
495
496     """
497     return backend.ListVolumeGroups()
498
499   # Storage --------------------------
500
501   @staticmethod
502   def perspective_storage_list(params):
503     """Get list of storage units.
504
505     """
506     (su_name, su_args, name, fields) = params
507     return storage.GetStorage(su_name, *su_args).List(name, fields)
508
509   @staticmethod
510   def perspective_storage_modify(params):
511     """Modify a storage unit.
512
513     """
514     (su_name, su_args, name, changes) = params
515     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
516
517   @staticmethod
518   def perspective_storage_execute(params):
519     """Execute an operation on a storage unit.
520
521     """
522     (su_name, su_args, name, op) = params
523     return storage.GetStorage(su_name, *su_args).Execute(name, op)
524
525   # bridge  --------------------------
526
527   @staticmethod
528   def perspective_bridges_exist(params):
529     """Check if all bridges given exist on this node.
530
531     """
532     bridges_list = params[0]
533     return backend.BridgesExist(bridges_list)
534
535   # instance  --------------------------
536
537   @staticmethod
538   def perspective_instance_os_add(params):
539     """Install an OS on a given instance.
540
541     """
542     inst_s = params[0]
543     inst = objects.Instance.FromDict(inst_s)
544     reinstall = params[1]
545     debug = params[2]
546     return backend.InstanceOsAdd(inst, reinstall, debug)
547
548   @staticmethod
549   def perspective_instance_run_rename(params):
550     """Runs the OS rename script for an instance.
551
552     """
553     inst_s, old_name, debug = params
554     inst = objects.Instance.FromDict(inst_s)
555     return backend.RunRenameInstance(inst, old_name, debug)
556
557   @staticmethod
558   def perspective_instance_shutdown(params):
559     """Shutdown an instance.
560
561     """
562     instance = objects.Instance.FromDict(params[0])
563     timeout = params[1]
564     return backend.InstanceShutdown(instance, timeout)
565
566   @staticmethod
567   def perspective_instance_start(params):
568     """Start an instance.
569
570     """
571     (instance_name, startup_paused) = params
572     instance = objects.Instance.FromDict(instance_name)
573     return backend.StartInstance(instance, startup_paused)
574
575   @staticmethod
576   def perspective_migration_info(params):
577     """Gather information about an instance to be migrated.
578
579     """
580     instance = objects.Instance.FromDict(params[0])
581     return backend.MigrationInfo(instance)
582
583   @staticmethod
584   def perspective_accept_instance(params):
585     """Prepare the node to accept an instance.
586
587     """
588     instance, info, target = params
589     instance = objects.Instance.FromDict(instance)
590     return backend.AcceptInstance(instance, info, target)
591
592   @staticmethod
593   def perspective_instance_finalize_migration_dst(params):
594     """Finalize the instance migration on the destination node.
595
596     """
597     instance, info, success = params
598     instance = objects.Instance.FromDict(instance)
599     return backend.FinalizeMigrationDst(instance, info, success)
600
601   @staticmethod
602   def perspective_instance_migrate(params):
603     """Migrates an instance.
604
605     """
606     instance, target, live = params
607     instance = objects.Instance.FromDict(instance)
608     return backend.MigrateInstance(instance, target, live)
609
610   @staticmethod
611   def perspective_instance_finalize_migration_src(params):
612     """Finalize the instance migration on the source node.
613
614     """
615     instance, success, live = params
616     instance = objects.Instance.FromDict(instance)
617     return backend.FinalizeMigrationSource(instance, success, live)
618
619   @staticmethod
620   def perspective_instance_get_migration_status(params):
621     """Reports migration status.
622
623     """
624     instance = objects.Instance.FromDict(params[0])
625     return backend.GetMigrationStatus(instance).ToDict()
626
627   @staticmethod
628   def perspective_instance_reboot(params):
629     """Reboot an instance.
630
631     """
632     instance = objects.Instance.FromDict(params[0])
633     reboot_type = params[1]
634     shutdown_timeout = params[2]
635     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
636
637   @staticmethod
638   def perspective_instance_balloon_memory(params):
639     """Modify instance runtime memory.
640
641     """
642     instance_dict, memory = params
643     instance = objects.Instance.FromDict(instance_dict)
644     return backend.InstanceBalloonMemory(instance, memory)
645
646   @staticmethod
647   def perspective_instance_info(params):
648     """Query instance information.
649
650     """
651     return backend.GetInstanceInfo(params[0], params[1])
652
653   @staticmethod
654   def perspective_instance_migratable(params):
655     """Query whether the specified instance can be migrated.
656
657     """
658     instance = objects.Instance.FromDict(params[0])
659     return backend.GetInstanceMigratable(instance)
660
661   @staticmethod
662   def perspective_all_instances_info(params):
663     """Query information about all instances.
664
665     """
666     return backend.GetAllInstancesInfo(params[0])
667
668   @staticmethod
669   def perspective_instance_list(params):
670     """Query the list of running instances.
671
672     """
673     return backend.GetInstanceList(params[0])
674
675   # node --------------------------
676
677   @staticmethod
678   def perspective_node_has_ip_address(params):
679     """Checks if a node has the given ip address.
680
681     """
682     return netutils.IPAddress.Own(params[0])
683
684   @staticmethod
685   def perspective_node_info(params):
686     """Query node information.
687
688     """
689     (vg_names, hv_names) = params
690     return backend.GetNodeInfo(vg_names, hv_names)
691
692   @staticmethod
693   def perspective_etc_hosts_modify(params):
694     """Modify a node entry in /etc/hosts.
695
696     """
697     backend.EtcHostsModify(params[0], params[1], params[2])
698
699     return True
700
701   @staticmethod
702   def perspective_node_verify(params):
703     """Run a verify sequence on this node.
704
705     """
706     return backend.VerifyNode(params[0], params[1])
707
708   @staticmethod
709   def perspective_node_start_master_daemons(params):
710     """Start the master daemons on this node.
711
712     """
713     return backend.StartMasterDaemons(params[0])
714
715   @staticmethod
716   def perspective_node_activate_master_ip(params):
717     """Activate the master IP on this node.
718
719     """
720     master_params = objects.MasterNetworkParameters.FromDict(params[0])
721     return backend.ActivateMasterIp(master_params, params[1])
722
723   @staticmethod
724   def perspective_node_deactivate_master_ip(params):
725     """Deactivate the master IP on this node.
726
727     """
728     master_params = objects.MasterNetworkParameters.FromDict(params[0])
729     return backend.DeactivateMasterIp(master_params, params[1])
730
731   @staticmethod
732   def perspective_node_stop_master(params):
733     """Stops master daemons on this node.
734
735     """
736     return backend.StopMasterDaemons()
737
738   @staticmethod
739   def perspective_node_change_master_netmask(params):
740     """Change the master IP netmask.
741
742     """
743     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
744                                        params[3])
745
746   @staticmethod
747   def perspective_node_leave_cluster(params):
748     """Cleanup after leaving a cluster.
749
750     """
751     return backend.LeaveCluster(params[0])
752
753   @staticmethod
754   def perspective_node_volumes(params):
755     """Query the list of all logical volume groups.
756
757     """
758     return backend.NodeVolumes()
759
760   @staticmethod
761   def perspective_node_demote_from_mc(params):
762     """Demote a node from the master candidate role.
763
764     """
765     return backend.DemoteFromMC()
766
767   @staticmethod
768   def perspective_node_powercycle(params):
769     """Tries to powercycle the nod.
770
771     """
772     hypervisor_type = params[0]
773     return backend.PowercycleNode(hypervisor_type)
774
775   # cluster --------------------------
776
777   @staticmethod
778   def perspective_version(params):
779     """Query version information.
780
781     """
782     return constants.PROTOCOL_VERSION
783
784   @staticmethod
785   def perspective_upload_file(params):
786     """Upload a file.
787
788     Note that the backend implementation imposes strict rules on which
789     files are accepted.
790
791     """
792     return backend.UploadFile(*(params[0]))
793
794   @staticmethod
795   def perspective_master_info(params):
796     """Query master information.
797
798     """
799     return backend.GetMasterInfo()
800
801   @staticmethod
802   def perspective_run_oob(params):
803     """Runs oob on node.
804
805     """
806     output = backend.RunOob(params[0], params[1], params[2], params[3])
807     if output:
808       result = serializer.LoadJson(output)
809     else:
810       result = None
811     return result
812
813   @staticmethod
814   def perspective_restricted_command(params):
815     """Runs a restricted command.
816
817     """
818     (cmd, ) = params
819
820     return backend.RunRestrictedCmd(cmd)
821
822   @staticmethod
823   def perspective_write_ssconf_files(params):
824     """Write ssconf files.
825
826     """
827     (values,) = params
828     return ssconf.WriteSsconfFiles(values)
829
830   # os -----------------------
831
832   @staticmethod
833   def perspective_os_diagnose(params):
834     """Query detailed information about existing OSes.
835
836     """
837     return backend.DiagnoseOS()
838
839   @staticmethod
840   def perspective_os_get(params):
841     """Query information about a given OS.
842
843     """
844     name = params[0]
845     os_obj = backend.OSFromDisk(name)
846     return os_obj.ToDict()
847
848   @staticmethod
849   def perspective_os_validate(params):
850     """Run a given OS' validation routine.
851
852     """
853     required, name, checks, params = params
854     return backend.ValidateOS(required, name, checks, params)
855
856   # hooks -----------------------
857
858   @staticmethod
859   def perspective_hooks_runner(params):
860     """Run hook scripts.
861
862     """
863     hpath, phase, env = params
864     hr = backend.HooksRunner()
865     return hr.RunHooks(hpath, phase, env)
866
867   # iallocator -----------------
868
869   @staticmethod
870   def perspective_iallocator_runner(params):
871     """Run an iallocator script.
872
873     """
874     name, idata = params
875     iar = backend.IAllocatorRunner()
876     return iar.Run(name, idata)
877
878   # test -----------------------
879
880   @staticmethod
881   def perspective_test_delay(params):
882     """Run test delay.
883
884     """
885     duration = params[0]
886     status, rval = utils.TestDelay(duration)
887     if not status:
888       raise backend.RPCFail(rval)
889     return rval
890
891   # file storage ---------------
892
893   @staticmethod
894   def perspective_file_storage_dir_create(params):
895     """Create the file storage directory.
896
897     """
898     file_storage_dir = params[0]
899     return backend.CreateFileStorageDir(file_storage_dir)
900
901   @staticmethod
902   def perspective_file_storage_dir_remove(params):
903     """Remove the file storage directory.
904
905     """
906     file_storage_dir = params[0]
907     return backend.RemoveFileStorageDir(file_storage_dir)
908
909   @staticmethod
910   def perspective_file_storage_dir_rename(params):
911     """Rename the file storage directory.
912
913     """
914     old_file_storage_dir = params[0]
915     new_file_storage_dir = params[1]
916     return backend.RenameFileStorageDir(old_file_storage_dir,
917                                         new_file_storage_dir)
918
919   # jobs ------------------------
920
921   @staticmethod
922   @_RequireJobQueueLock
923   def perspective_jobqueue_update(params):
924     """Update job queue.
925
926     """
927     (file_name, content) = params
928     return backend.JobQueueUpdate(file_name, content)
929
930   @staticmethod
931   @_RequireJobQueueLock
932   def perspective_jobqueue_purge(params):
933     """Purge job queue.
934
935     """
936     return backend.JobQueuePurge()
937
938   @staticmethod
939   @_RequireJobQueueLock
940   def perspective_jobqueue_rename(params):
941     """Rename a job queue file.
942
943     """
944     # TODO: What if a file fails to rename?
945     return [backend.JobQueueRename(old, new) for old, new in params[0]]
946
947   @staticmethod
948   @_RequireJobQueueLock
949   def perspective_jobqueue_set_drain_flag(params):
950     """Set job queue's drain flag.
951
952     """
953     (flag, ) = params
954
955     return jstore.SetDrainFlag(flag)
956
957   # hypervisor ---------------
958
959   @staticmethod
960   def perspective_hypervisor_validate_params(params):
961     """Validate the hypervisor parameters.
962
963     """
964     (hvname, hvparams) = params
965     return backend.ValidateHVParams(hvname, hvparams)
966
967   # Crypto
968
969   @staticmethod
970   def perspective_x509_cert_create(params):
971     """Creates a new X509 certificate for SSL/TLS.
972
973     """
974     (validity, ) = params
975     return backend.CreateX509Certificate(validity)
976
977   @staticmethod
978   def perspective_x509_cert_remove(params):
979     """Removes a X509 certificate.
980
981     """
982     (name, ) = params
983     return backend.RemoveX509Certificate(name)
984
985   # Import and export
986
987   @staticmethod
988   def perspective_import_start(params):
989     """Starts an import daemon.
990
991     """
992     (opts_s, instance, component, (dest, dest_args)) = params
993
994     opts = objects.ImportExportOptions.FromDict(opts_s)
995
996     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
997                                            None, None,
998                                            objects.Instance.FromDict(instance),
999                                            component, dest,
1000                                            _DecodeImportExportIO(dest,
1001                                                                  dest_args))
1002
1003   @staticmethod
1004   def perspective_export_start(params):
1005     """Starts an export daemon.
1006
1007     """
1008     (opts_s, host, port, instance, component, (source, source_args)) = params
1009
1010     opts = objects.ImportExportOptions.FromDict(opts_s)
1011
1012     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1013                                            host, port,
1014                                            objects.Instance.FromDict(instance),
1015                                            component, source,
1016                                            _DecodeImportExportIO(source,
1017                                                                  source_args))
1018
1019   @staticmethod
1020   def perspective_impexp_status(params):
1021     """Retrieves the status of an import or export daemon.
1022
1023     """
1024     return backend.GetImportExportStatus(params[0])
1025
1026   @staticmethod
1027   def perspective_impexp_abort(params):
1028     """Aborts an import or export.
1029
1030     """
1031     return backend.AbortImportExport(params[0])
1032
1033   @staticmethod
1034   def perspective_impexp_cleanup(params):
1035     """Cleans up after an import or export.
1036
1037     """
1038     return backend.CleanupImportExport(params[0])
1039
1040
1041 def CheckNoded(_, args):
1042   """Initial checks whether to run or exit with a failure.
1043
1044   """
1045   if args: # noded doesn't take any arguments
1046     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1047                           sys.argv[0])
1048     sys.exit(constants.EXIT_FAILURE)
1049   try:
1050     codecs.lookup("string-escape")
1051   except LookupError:
1052     print >> sys.stderr, ("Can't load the string-escape code which is part"
1053                           " of the Python installation. Is your installation"
1054                           " complete/correct? Aborting.")
1055     sys.exit(constants.EXIT_FAILURE)
1056
1057
1058 def PrepNoded(options, _):
1059   """Preparation node daemon function, executed with the PID file held.
1060
1061   """
1062   if options.mlock:
1063     request_executor_class = MlockallRequestExecutor
1064     try:
1065       utils.Mlockall()
1066     except errors.NoCtypesError:
1067       logging.warning("Cannot set memory lock, ctypes module not found")
1068       request_executor_class = http.server.HttpServerRequestExecutor
1069   else:
1070     request_executor_class = http.server.HttpServerRequestExecutor
1071
1072   # Read SSL certificate
1073   if options.ssl:
1074     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1075                                     ssl_cert_path=options.ssl_cert)
1076   else:
1077     ssl_params = None
1078
1079   err = _PrepareQueueLock()
1080   if err is not None:
1081     # this might be some kind of file-system/permission error; while
1082     # this breaks the job queue functionality, we shouldn't prevent
1083     # startup of the whole node daemon because of this
1084     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1085
1086   handler = NodeRequestHandler()
1087
1088   mainloop = daemon.Mainloop()
1089   server = \
1090     http.server.HttpServer(mainloop, options.bind_address, options.port,
1091                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1092                            request_executor_class=request_executor_class)
1093   server.Start()
1094
1095   return (mainloop, server)
1096
1097
1098 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1099   """Main node daemon function, executed with the PID file held.
1100
1101   """
1102   (mainloop, server) = prep_data
1103   try:
1104     mainloop.Run()
1105   finally:
1106     server.Stop()
1107
1108
1109 def Main():
1110   """Main function for the node daemon.
1111
1112   """
1113   parser = OptionParser(description="Ganeti node daemon",
1114                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1115                         version="%%prog (ganeti) %s" %
1116                         constants.RELEASE_VERSION)
1117   parser.add_option("--no-mlock", dest="mlock",
1118                     help="Do not mlock the node memory in ram",
1119                     default=True, action="store_false")
1120
1121   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1122                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1123                      default_ssl_key=pathutils.NODED_CERT_FILE,
1124                      console_logging=True)