Merge branch 'stable-2.9' into master
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti.storage import container
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _extendReasonTrail(trail, source, reason=""):
61   """Extend the reason trail with noded information
62
63   The trail is extended by appending the name of the noded functionality
64   """
65   assert trail is not None
66   trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67   trail.append((trail_source, reason, utils.EpochNano()))
68
69
70 def _PrepareQueueLock():
71   """Try to prepare the queue lock.
72
73   @return: None for success, otherwise an exception object
74
75   """
76   global queue_lock # pylint: disable=W0603
77
78   if queue_lock is not None:
79     return None
80
81   # Prepare job queue
82   try:
83     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84     return None
85   except EnvironmentError, err:
86     return err
87
88
89 def _RequireJobQueueLock(fn):
90   """Decorator for job queue manipulating functions.
91
92   """
93   QUEUE_LOCK_TIMEOUT = 10
94
95   def wrapper(*args, **kwargs):
96     # Locking in exclusive, blocking mode because there could be several
97     # children running at the same time. Waiting up to 10 seconds.
98     if _PrepareQueueLock() is not None:
99       raise errors.JobQueueError("Job queue failed initialization,"
100                                  " cannot update jobs")
101     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102     try:
103       return fn(*args, **kwargs)
104     finally:
105       queue_lock.Unlock()
106
107   return wrapper
108
109
110 def _DecodeImportExportIO(ieio, ieioargs):
111   """Decodes import/export I/O information.
112
113   """
114   if ieio == constants.IEIO_RAW_DISK:
115     assert len(ieioargs) == 1
116     return (objects.Disk.FromDict(ieioargs[0]), )
117
118   if ieio == constants.IEIO_SCRIPT:
119     assert len(ieioargs) == 2
120     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121
122   return ieioargs
123
124
125 def _DefaultAlternative(value, default):
126   """Returns value or, if evaluating to False, a default value.
127
128   Returns the given value, unless it evaluates to False. In the latter case the
129   default value is returned.
130
131   @param value: Value to return if it doesn't evaluate to False
132   @param default: Default value
133   @return: Given value or the default
134
135   """
136   if value:
137     return value
138
139   return default
140
141
142 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143   """Subclass ensuring request handlers are locked in RAM.
144
145   """
146   def __init__(self, *args, **kwargs):
147     utils.Mlockall()
148
149     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151
152 class NodeRequestHandler(http.server.HttpServerHandler):
153   """The server implementation.
154
155   This class holds all methods exposed over the RPC interface.
156
157   """
158   # too many public methods, and unused args - all methods get params
159   # due to the API
160   # pylint: disable=R0904,W0613
161   def __init__(self):
162     http.server.HttpServerHandler.__init__(self)
163     self.noded_pid = os.getpid()
164
165   def HandleRequest(self, req):
166     """Handle a request.
167
168     """
169     if req.request_method.upper() != http.HTTP_POST:
170       raise http.HttpBadRequest("Only the POST method is supported")
171
172     path = req.request_path
173     if path.startswith("/"):
174       path = path[1:]
175
176     method = getattr(self, "perspective_%s" % path, None)
177     if method is None:
178       raise http.HttpNotFound()
179
180     try:
181       result = (True, method(serializer.LoadJson(req.request_body)))
182
183     except backend.RPCFail, err:
184       # our custom failure exception; str(err) works fine if the
185       # exception was constructed with a single argument, and in
186       # this case, err.message == err.args[0] == str(err)
187       result = (False, str(err))
188     except errors.QuitGanetiException, err:
189       # Tell parent to quit
190       logging.info("Shutting down the node daemon, arguments: %s",
191                    str(err.args))
192       os.kill(self.noded_pid, signal.SIGTERM)
193       # And return the error's arguments, which must be already in
194       # correct tuple format
195       result = err.args
196     except Exception, err:
197       logging.exception("Error in RPC call")
198       result = (False, "Error while executing backend function: %s" % str(err))
199
200     return serializer.DumpJson(result)
201
202   # the new block devices  --------------------------
203
204   @staticmethod
205   def perspective_blockdev_create(params):
206     """Create a block device.
207
208     """
209     (bdev_s, size, owner, on_primary, info, excl_stor) = params
210     bdev = objects.Disk.FromDict(bdev_s)
211     if bdev is None:
212       raise ValueError("can't unserialize data!")
213     return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214                                   excl_stor)
215
216   @staticmethod
217   def perspective_blockdev_pause_resume_sync(params):
218     """Pause/resume sync of a block device.
219
220     """
221     disks_s, pause = params
222     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
223     return backend.BlockdevPauseResumeSync(disks, pause)
224
225   @staticmethod
226   def perspective_blockdev_wipe(params):
227     """Wipe a block device.
228
229     """
230     bdev_s, offset, size = params
231     bdev = objects.Disk.FromDict(bdev_s)
232     return backend.BlockdevWipe(bdev, offset, size)
233
234   @staticmethod
235   def perspective_blockdev_remove(params):
236     """Remove a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     return backend.BlockdevRemove(bdev)
242
243   @staticmethod
244   def perspective_blockdev_rename(params):
245     """Remove a block device.
246
247     """
248     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
249     return backend.BlockdevRename(devlist)
250
251   @staticmethod
252   def perspective_blockdev_assemble(params):
253     """Assemble a block device.
254
255     """
256     bdev_s, owner, on_primary, idx = params
257     bdev = objects.Disk.FromDict(bdev_s)
258     if bdev is None:
259       raise ValueError("can't unserialize data!")
260     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261
262   @staticmethod
263   def perspective_blockdev_shutdown(params):
264     """Shutdown a block device.
265
266     """
267     bdev_s = params[0]
268     bdev = objects.Disk.FromDict(bdev_s)
269     if bdev is None:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevShutdown(bdev)
272
273   @staticmethod
274   def perspective_blockdev_addchildren(params):
275     """Add a child to a mirror device.
276
277     Note: this is only valid for mirror devices. It's the caller's duty
278     to send a correct disk, otherwise we raise an error.
279
280     """
281     bdev_s, ndev_s = params
282     bdev = objects.Disk.FromDict(bdev_s)
283     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284     if bdev is None or ndevs.count(None) > 0:
285       raise ValueError("can't unserialize data!")
286     return backend.BlockdevAddchildren(bdev, ndevs)
287
288   @staticmethod
289   def perspective_blockdev_removechildren(params):
290     """Remove a child from a mirror device.
291
292     This is only valid for mirror devices, of course. It's the callers
293     duty to send a correct disk, otherwise we raise an error.
294
295     """
296     bdev_s, ndev_s = params
297     bdev = objects.Disk.FromDict(bdev_s)
298     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299     if bdev is None or ndevs.count(None) > 0:
300       raise ValueError("can't unserialize data!")
301     return backend.BlockdevRemovechildren(bdev, ndevs)
302
303   @staticmethod
304   def perspective_blockdev_getmirrorstatus(params):
305     """Return the mirror status for a list of disks.
306
307     """
308     disks = [objects.Disk.FromDict(dsk_s)
309              for dsk_s in params[0]]
310     return [status.ToDict()
311             for status in backend.BlockdevGetmirrorstatus(disks)]
312
313   @staticmethod
314   def perspective_blockdev_getmirrorstatus_multi(params):
315     """Return the mirror status for a list of disks.
316
317     """
318     (node_disks, ) = params
319
320     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322     result = []
323
324     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325       if success:
326         result.append((success, status.ToDict()))
327       else:
328         result.append((success, status))
329
330     return result
331
332   @staticmethod
333   def perspective_blockdev_find(params):
334     """Expose the FindBlockDevice functionality for a disk.
335
336     This will try to find but not activate a disk.
337
338     """
339     disk = objects.Disk.FromDict(params[0])
340
341     result = backend.BlockdevFind(disk)
342     if result is None:
343       return None
344
345     return result.ToDict()
346
347   @staticmethod
348   def perspective_blockdev_snapshot(params):
349     """Create a snapshot device.
350
351     Note that this is only valid for LVM disks, if we get passed
352     something else we raise an exception. The snapshot device can be
353     remove by calling the generic block device remove call.
354
355     """
356     cfbd = objects.Disk.FromDict(params[0])
357     return backend.BlockdevSnapshot(cfbd)
358
359   @staticmethod
360   def perspective_blockdev_grow(params):
361     """Grow a stack of devices.
362
363     """
364     if len(params) < 5:
365       raise ValueError("Received only %s parameters in blockdev_grow,"
366                        " old master?" % len(params))
367     cfbd = objects.Disk.FromDict(params[0])
368     amount = params[1]
369     dryrun = params[2]
370     backingstore = params[3]
371     excl_stor = params[4]
372     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
373
374   @staticmethod
375   def perspective_blockdev_close(params):
376     """Closes the given block devices.
377
378     """
379     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
380     return backend.BlockdevClose(params[0], disks)
381
382   @staticmethod
383   def perspective_blockdev_getdimensions(params):
384     """Compute the sizes of the given block devices.
385
386     """
387     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
388     return backend.BlockdevGetdimensions(disks)
389
390   @staticmethod
391   def perspective_blockdev_export(params):
392     """Compute the sizes of the given block devices.
393
394     """
395     disk = objects.Disk.FromDict(params[0])
396     dest_node, dest_path, cluster_name = params[1:]
397     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
398
399   @staticmethod
400   def perspective_blockdev_setinfo(params):
401     """Sets metadata information on the given block device.
402
403     """
404     (disk, info) = params
405     disk = objects.Disk.FromDict(disk)
406     return backend.BlockdevSetInfo(disk, info)
407
408   # blockdev/drbd specific methods ----------
409
410   @staticmethod
411   def perspective_drbd_disconnect_net(params):
412     """Disconnects the network connection of drbd disks.
413
414     Note that this is only valid for drbd disks, so the members of the
415     disk list must all be drbd devices.
416
417     """
418     nodes_ip, disks, target_node_uuid = params
419     disks = [objects.Disk.FromDict(cf) for cf in disks]
420     return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
421
422   @staticmethod
423   def perspective_drbd_attach_net(params):
424     """Attaches the network connection of drbd disks.
425
426     Note that this is only valid for drbd disks, so the members of the
427     disk list must all be drbd devices.
428
429     """
430     nodes_ip, disks, instance_name, multimaster, target_node_uuid = params
431     disks = [objects.Disk.FromDict(cf) for cf in disks]
432     return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks,
433                                  instance_name, multimaster)
434
435   @staticmethod
436   def perspective_drbd_wait_sync(params):
437     """Wait until DRBD disks are synched.
438
439     Note that this is only valid for drbd disks, so the members of the
440     disk list must all be drbd devices.
441
442     """
443     nodes_ip, disks, target_node_uuid = params
444     disks = [objects.Disk.FromDict(cf) for cf in disks]
445     return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
446
447   @staticmethod
448   def perspective_drbd_needs_activation(params):
449     """Checks if the drbd devices need activation
450
451     Note that this is only valid for drbd disks, so the members of the
452     disk list must all be drbd devices.
453
454     """
455     nodes_ip, disks, target_node_uuid = params
456     disks = [objects.Disk.FromDict(cf) for cf in disks]
457     return backend.DrbdNeedsActivation(target_node_uuid, nodes_ip, disks)
458
459   @staticmethod
460   def perspective_drbd_helper(params):
461     """Query drbd helper.
462
463     """
464     return backend.GetDrbdUsermodeHelper()
465
466   # export/import  --------------------------
467
468   @staticmethod
469   def perspective_finalize_export(params):
470     """Expose the finalize export functionality.
471
472     """
473     instance = objects.Instance.FromDict(params[0])
474
475     snap_disks = []
476     for disk in params[1]:
477       if isinstance(disk, bool):
478         snap_disks.append(disk)
479       else:
480         snap_disks.append(objects.Disk.FromDict(disk))
481
482     return backend.FinalizeExport(instance, snap_disks)
483
484   @staticmethod
485   def perspective_export_info(params):
486     """Query information about an existing export on this node.
487
488     The given path may not contain an export, in which case we return
489     None.
490
491     """
492     path = params[0]
493     return backend.ExportInfo(path)
494
495   @staticmethod
496   def perspective_export_list(params):
497     """List the available exports on this node.
498
499     Note that as opposed to export_info, which may query data about an
500     export in any path, this only queries the standard Ganeti path
501     (pathutils.EXPORT_DIR).
502
503     """
504     return backend.ListExports()
505
506   @staticmethod
507   def perspective_export_remove(params):
508     """Remove an export.
509
510     """
511     export = params[0]
512     return backend.RemoveExport(export)
513
514   # block device ---------------------
515   @staticmethod
516   def perspective_bdev_sizes(params):
517     """Query the list of block devices
518
519     """
520     devices = params[0]
521     return backend.GetBlockDevSizes(devices)
522
523   # volume  --------------------------
524
525   @staticmethod
526   def perspective_lv_list(params):
527     """Query the list of logical volumes in a given volume group.
528
529     """
530     vgname = params[0]
531     return backend.GetVolumeList(vgname)
532
533   @staticmethod
534   def perspective_vg_list(params):
535     """Query the list of volume groups.
536
537     """
538     return backend.ListVolumeGroups()
539
540   # Storage --------------------------
541
542   @staticmethod
543   def perspective_storage_list(params):
544     """Get list of storage units.
545
546     """
547     (su_name, su_args, name, fields) = params
548     return container.GetStorage(su_name, *su_args).List(name, fields)
549
550   @staticmethod
551   def perspective_storage_modify(params):
552     """Modify a storage unit.
553
554     """
555     (su_name, su_args, name, changes) = params
556     return container.GetStorage(su_name, *su_args).Modify(name, changes)
557
558   @staticmethod
559   def perspective_storage_execute(params):
560     """Execute an operation on a storage unit.
561
562     """
563     (su_name, su_args, name, op) = params
564     return container.GetStorage(su_name, *su_args).Execute(name, op)
565
566   # bridge  --------------------------
567
568   @staticmethod
569   def perspective_bridges_exist(params):
570     """Check if all bridges given exist on this node.
571
572     """
573     bridges_list = params[0]
574     return backend.BridgesExist(bridges_list)
575
576   # instance  --------------------------
577
578   @staticmethod
579   def perspective_instance_os_add(params):
580     """Install an OS on a given instance.
581
582     """
583     inst_s = params[0]
584     inst = objects.Instance.FromDict(inst_s)
585     reinstall = params[1]
586     debug = params[2]
587     return backend.InstanceOsAdd(inst, reinstall, debug)
588
589   @staticmethod
590   def perspective_instance_run_rename(params):
591     """Runs the OS rename script for an instance.
592
593     """
594     inst_s, old_name, debug = params
595     inst = objects.Instance.FromDict(inst_s)
596     return backend.RunRenameInstance(inst, old_name, debug)
597
598   @staticmethod
599   def perspective_instance_shutdown(params):
600     """Shutdown an instance.
601
602     """
603     instance = objects.Instance.FromDict(params[0])
604     timeout = params[1]
605     trail = params[2]
606     _extendReasonTrail(trail, "shutdown")
607     return backend.InstanceShutdown(instance, timeout, trail)
608
609   @staticmethod
610   def perspective_instance_start(params):
611     """Start an instance.
612
613     """
614     (instance_name, startup_paused, trail) = params
615     instance = objects.Instance.FromDict(instance_name)
616     _extendReasonTrail(trail, "start")
617     return backend.StartInstance(instance, startup_paused, trail)
618
619   @staticmethod
620   def perspective_migration_info(params):
621     """Gather information about an instance to be migrated.
622
623     """
624     instance = objects.Instance.FromDict(params[0])
625     return backend.MigrationInfo(instance)
626
627   @staticmethod
628   def perspective_accept_instance(params):
629     """Prepare the node to accept an instance.
630
631     """
632     instance, info, target = params
633     instance = objects.Instance.FromDict(instance)
634     return backend.AcceptInstance(instance, info, target)
635
636   @staticmethod
637   def perspective_instance_finalize_migration_dst(params):
638     """Finalize the instance migration on the destination node.
639
640     """
641     instance, info, success = params
642     instance = objects.Instance.FromDict(instance)
643     return backend.FinalizeMigrationDst(instance, info, success)
644
645   @staticmethod
646   def perspective_instance_migrate(params):
647     """Migrates an instance.
648
649     """
650     cluster_name, instance, target, live = params
651     instance = objects.Instance.FromDict(instance)
652     return backend.MigrateInstance(cluster_name, instance, target, live)
653
654   @staticmethod
655   def perspective_instance_finalize_migration_src(params):
656     """Finalize the instance migration on the source node.
657
658     """
659     instance, success, live = params
660     instance = objects.Instance.FromDict(instance)
661     return backend.FinalizeMigrationSource(instance, success, live)
662
663   @staticmethod
664   def perspective_instance_get_migration_status(params):
665     """Reports migration status.
666
667     """
668     instance = objects.Instance.FromDict(params[0])
669     return backend.GetMigrationStatus(instance).ToDict()
670
671   @staticmethod
672   def perspective_instance_reboot(params):
673     """Reboot an instance.
674
675     """
676     instance = objects.Instance.FromDict(params[0])
677     reboot_type = params[1]
678     shutdown_timeout = params[2]
679     trail = params[3]
680     _extendReasonTrail(trail, "reboot")
681     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
682                                   trail)
683
684   @staticmethod
685   def perspective_instance_balloon_memory(params):
686     """Modify instance runtime memory.
687
688     """
689     instance_dict, memory = params
690     instance = objects.Instance.FromDict(instance_dict)
691     return backend.InstanceBalloonMemory(instance, memory)
692
693   @staticmethod
694   def perspective_instance_info(params):
695     """Query instance information.
696
697     """
698     (instance_name, hypervisor_name, hvparams) = params
699     return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
700
701   @staticmethod
702   def perspective_instance_migratable(params):
703     """Query whether the specified instance can be migrated.
704
705     """
706     instance = objects.Instance.FromDict(params[0])
707     return backend.GetInstanceMigratable(instance)
708
709   @staticmethod
710   def perspective_all_instances_info(params):
711     """Query information about all instances.
712
713     """
714     (hypervisor_list, all_hvparams) = params
715     return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
716
717   @staticmethod
718   def perspective_instance_list(params):
719     """Query the list of running instances.
720
721     """
722     (hypervisor_list, hvparams) = params
723     return backend.GetInstanceList(hypervisor_list, hvparams)
724
725   # node --------------------------
726
727   @staticmethod
728   def perspective_node_has_ip_address(params):
729     """Checks if a node has the given ip address.
730
731     """
732     return netutils.IPAddress.Own(params[0])
733
734   @staticmethod
735   def perspective_node_info(params):
736     """Query node information.
737
738     """
739     (storage_units, hv_specs) = params
740     return backend.GetNodeInfo(storage_units, hv_specs)
741
742   @staticmethod
743   def perspective_etc_hosts_modify(params):
744     """Modify a node entry in /etc/hosts.
745
746     """
747     backend.EtcHostsModify(params[0], params[1], params[2])
748
749     return True
750
751   @staticmethod
752   def perspective_node_verify(params):
753     """Run a verify sequence on this node.
754
755     """
756     (what, cluster_name, hvparams) = params
757     return backend.VerifyNode(what, cluster_name, hvparams)
758
759   @classmethod
760   def perspective_node_verify_light(cls, params):
761     """Run a light verify sequence on this node.
762
763     """
764     # So far it's the same as the normal node_verify
765     return cls.perspective_node_verify(params)
766
767   @staticmethod
768   def perspective_node_start_master_daemons(params):
769     """Start the master daemons on this node.
770
771     """
772     return backend.StartMasterDaemons(params[0])
773
774   @staticmethod
775   def perspective_node_activate_master_ip(params):
776     """Activate the master IP on this node.
777
778     """
779     master_params = objects.MasterNetworkParameters.FromDict(params[0])
780     return backend.ActivateMasterIp(master_params, params[1])
781
782   @staticmethod
783   def perspective_node_deactivate_master_ip(params):
784     """Deactivate the master IP on this node.
785
786     """
787     master_params = objects.MasterNetworkParameters.FromDict(params[0])
788     return backend.DeactivateMasterIp(master_params, params[1])
789
790   @staticmethod
791   def perspective_node_stop_master(params):
792     """Stops master daemons on this node.
793
794     """
795     return backend.StopMasterDaemons()
796
797   @staticmethod
798   def perspective_node_change_master_netmask(params):
799     """Change the master IP netmask.
800
801     """
802     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
803                                        params[3])
804
805   @staticmethod
806   def perspective_node_leave_cluster(params):
807     """Cleanup after leaving a cluster.
808
809     """
810     return backend.LeaveCluster(params[0])
811
812   @staticmethod
813   def perspective_node_volumes(params):
814     """Query the list of all logical volume groups.
815
816     """
817     return backend.NodeVolumes()
818
819   @staticmethod
820   def perspective_node_demote_from_mc(params):
821     """Demote a node from the master candidate role.
822
823     """
824     return backend.DemoteFromMC()
825
826   @staticmethod
827   def perspective_node_powercycle(params):
828     """Tries to powercycle the node.
829
830     """
831     (hypervisor_type, hvparams) = params
832     return backend.PowercycleNode(hypervisor_type, hvparams)
833
834   @staticmethod
835   def perspective_node_configure_ovs(params):
836     """Sets up OpenvSwitch on the node.
837
838     """
839     (ovs_name, ovs_link) = params
840     return backend.ConfigureOVS(ovs_name, ovs_link)
841
842   # cluster --------------------------
843
844   @staticmethod
845   def perspective_version(params):
846     """Query version information.
847
848     """
849     return constants.PROTOCOL_VERSION
850
851   @staticmethod
852   def perspective_upload_file(params):
853     """Upload a file.
854
855     Note that the backend implementation imposes strict rules on which
856     files are accepted.
857
858     """
859     return backend.UploadFile(*(params[0]))
860
861   @staticmethod
862   def perspective_master_info(params):
863     """Query master information.
864
865     """
866     return backend.GetMasterInfo()
867
868   @staticmethod
869   def perspective_run_oob(params):
870     """Runs oob on node.
871
872     """
873     output = backend.RunOob(params[0], params[1], params[2], params[3])
874     if output:
875       result = serializer.LoadJson(output)
876     else:
877       result = None
878     return result
879
880   @staticmethod
881   def perspective_restricted_command(params):
882     """Runs a restricted command.
883
884     """
885     (cmd, ) = params
886
887     return backend.RunRestrictedCmd(cmd)
888
889   @staticmethod
890   def perspective_write_ssconf_files(params):
891     """Write ssconf files.
892
893     """
894     (values,) = params
895     return ssconf.WriteSsconfFiles(values)
896
897   @staticmethod
898   def perspective_get_watcher_pause(params):
899     """Get watcher pause end.
900
901     """
902     return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
903
904   @staticmethod
905   def perspective_set_watcher_pause(params):
906     """Set watcher pause.
907
908     """
909     (until, ) = params
910     return backend.SetWatcherPause(until)
911
912   # os -----------------------
913
914   @staticmethod
915   def perspective_os_diagnose(params):
916     """Query detailed information about existing OSes.
917
918     """
919     return backend.DiagnoseOS()
920
921   @staticmethod
922   def perspective_os_get(params):
923     """Query information about a given OS.
924
925     """
926     name = params[0]
927     os_obj = backend.OSFromDisk(name)
928     return os_obj.ToDict()
929
930   @staticmethod
931   def perspective_os_validate(params):
932     """Run a given OS' validation routine.
933
934     """
935     required, name, checks, params = params
936     return backend.ValidateOS(required, name, checks, params)
937
938   # extstorage -----------------------
939
940   @staticmethod
941   def perspective_extstorage_diagnose(params):
942     """Query detailed information about existing extstorage providers.
943
944     """
945     return backend.DiagnoseExtStorage()
946
947   # hooks -----------------------
948
949   @staticmethod
950   def perspective_hooks_runner(params):
951     """Run hook scripts.
952
953     """
954     hpath, phase, env = params
955     hr = backend.HooksRunner()
956     return hr.RunHooks(hpath, phase, env)
957
958   # iallocator -----------------
959
960   @staticmethod
961   def perspective_iallocator_runner(params):
962     """Run an iallocator script.
963
964     """
965     name, idata = params
966     iar = backend.IAllocatorRunner()
967     return iar.Run(name, idata)
968
969   # test -----------------------
970
971   @staticmethod
972   def perspective_test_delay(params):
973     """Run test delay.
974
975     """
976     duration = params[0]
977     status, rval = utils.TestDelay(duration)
978     if not status:
979       raise backend.RPCFail(rval)
980     return rval
981
982   # file storage ---------------
983
984   @staticmethod
985   def perspective_file_storage_dir_create(params):
986     """Create the file storage directory.
987
988     """
989     file_storage_dir = params[0]
990     return backend.CreateFileStorageDir(file_storage_dir)
991
992   @staticmethod
993   def perspective_file_storage_dir_remove(params):
994     """Remove the file storage directory.
995
996     """
997     file_storage_dir = params[0]
998     return backend.RemoveFileStorageDir(file_storage_dir)
999
1000   @staticmethod
1001   def perspective_file_storage_dir_rename(params):
1002     """Rename the file storage directory.
1003
1004     """
1005     old_file_storage_dir = params[0]
1006     new_file_storage_dir = params[1]
1007     return backend.RenameFileStorageDir(old_file_storage_dir,
1008                                         new_file_storage_dir)
1009
1010   # jobs ------------------------
1011
1012   @staticmethod
1013   @_RequireJobQueueLock
1014   def perspective_jobqueue_update(params):
1015     """Update job queue.
1016
1017     """
1018     (file_name, content) = params
1019     return backend.JobQueueUpdate(file_name, content)
1020
1021   @staticmethod
1022   @_RequireJobQueueLock
1023   def perspective_jobqueue_purge(params):
1024     """Purge job queue.
1025
1026     """
1027     return backend.JobQueuePurge()
1028
1029   @staticmethod
1030   @_RequireJobQueueLock
1031   def perspective_jobqueue_rename(params):
1032     """Rename a job queue file.
1033
1034     """
1035     # TODO: What if a file fails to rename?
1036     return [backend.JobQueueRename(old, new) for old, new in params[0]]
1037
1038   @staticmethod
1039   @_RequireJobQueueLock
1040   def perspective_jobqueue_set_drain_flag(params):
1041     """Set job queue's drain flag.
1042
1043     """
1044     (flag, ) = params
1045
1046     return jstore.SetDrainFlag(flag)
1047
1048   # hypervisor ---------------
1049
1050   @staticmethod
1051   def perspective_hypervisor_validate_params(params):
1052     """Validate the hypervisor parameters.
1053
1054     """
1055     (hvname, hvparams) = params
1056     return backend.ValidateHVParams(hvname, hvparams)
1057
1058   # Crypto
1059
1060   @staticmethod
1061   def perspective_x509_cert_create(params):
1062     """Creates a new X509 certificate for SSL/TLS.
1063
1064     """
1065     (validity, ) = params
1066     return backend.CreateX509Certificate(validity)
1067
1068   @staticmethod
1069   def perspective_x509_cert_remove(params):
1070     """Removes a X509 certificate.
1071
1072     """
1073     (name, ) = params
1074     return backend.RemoveX509Certificate(name)
1075
1076   # Import and export
1077
1078   @staticmethod
1079   def perspective_import_start(params):
1080     """Starts an import daemon.
1081
1082     """
1083     (opts_s, instance, component, (dest, dest_args)) = params
1084
1085     opts = objects.ImportExportOptions.FromDict(opts_s)
1086
1087     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1088                                            None, None,
1089                                            objects.Instance.FromDict(instance),
1090                                            component, dest,
1091                                            _DecodeImportExportIO(dest,
1092                                                                  dest_args))
1093
1094   @staticmethod
1095   def perspective_export_start(params):
1096     """Starts an export daemon.
1097
1098     """
1099     (opts_s, host, port, instance, component, (source, source_args)) = params
1100
1101     opts = objects.ImportExportOptions.FromDict(opts_s)
1102
1103     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1104                                            host, port,
1105                                            objects.Instance.FromDict(instance),
1106                                            component, source,
1107                                            _DecodeImportExportIO(source,
1108                                                                  source_args))
1109
1110   @staticmethod
1111   def perspective_impexp_status(params):
1112     """Retrieves the status of an import or export daemon.
1113
1114     """
1115     return backend.GetImportExportStatus(params[0])
1116
1117   @staticmethod
1118   def perspective_impexp_abort(params):
1119     """Aborts an import or export.
1120
1121     """
1122     return backend.AbortImportExport(params[0])
1123
1124   @staticmethod
1125   def perspective_impexp_cleanup(params):
1126     """Cleans up after an import or export.
1127
1128     """
1129     return backend.CleanupImportExport(params[0])
1130
1131
1132 def CheckNoded(_, args):
1133   """Initial checks whether to run or exit with a failure.
1134
1135   """
1136   if args: # noded doesn't take any arguments
1137     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1138                           sys.argv[0])
1139     sys.exit(constants.EXIT_FAILURE)
1140   try:
1141     codecs.lookup("string-escape")
1142   except LookupError:
1143     print >> sys.stderr, ("Can't load the string-escape code which is part"
1144                           " of the Python installation. Is your installation"
1145                           " complete/correct? Aborting.")
1146     sys.exit(constants.EXIT_FAILURE)
1147
1148
1149 def PrepNoded(options, _):
1150   """Preparation node daemon function, executed with the PID file held.
1151
1152   """
1153   if options.mlock:
1154     request_executor_class = MlockallRequestExecutor
1155     try:
1156       utils.Mlockall()
1157     except errors.NoCtypesError:
1158       logging.warning("Cannot set memory lock, ctypes module not found")
1159       request_executor_class = http.server.HttpServerRequestExecutor
1160   else:
1161     request_executor_class = http.server.HttpServerRequestExecutor
1162
1163   # Read SSL certificate
1164   if options.ssl:
1165     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1166                                     ssl_cert_path=options.ssl_cert)
1167   else:
1168     ssl_params = None
1169
1170   err = _PrepareQueueLock()
1171   if err is not None:
1172     # this might be some kind of file-system/permission error; while
1173     # this breaks the job queue functionality, we shouldn't prevent
1174     # startup of the whole node daemon because of this
1175     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1176
1177   handler = NodeRequestHandler()
1178
1179   mainloop = daemon.Mainloop()
1180   server = \
1181     http.server.HttpServer(mainloop, options.bind_address, options.port,
1182                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1183                            request_executor_class=request_executor_class)
1184   server.Start()
1185
1186   return (mainloop, server)
1187
1188
1189 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1190   """Main node daemon function, executed with the PID file held.
1191
1192   """
1193   (mainloop, server) = prep_data
1194   try:
1195     mainloop.Run()
1196   finally:
1197     server.Stop()
1198
1199
1200 def Main():
1201   """Main function for the node daemon.
1202
1203   """
1204   parser = OptionParser(description="Ganeti node daemon",
1205                         usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1206                                " [-i INTERFACE]"),
1207                         version="%%prog (ganeti) %s" %
1208                         constants.RELEASE_VERSION)
1209   parser.add_option("--no-mlock", dest="mlock",
1210                     help="Do not mlock the node memory in ram",
1211                     default=True, action="store_false")
1212
1213   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1214                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1215                      default_ssl_key=pathutils.NODED_CERT_FILE,
1216                      console_logging=True)