Check DRBD status on verify-disks
[ganeti-local] / lib / server / noded.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # pylint: disable=C0103,W0142
25
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
28
29 # W0142: Used * or ** magic, since we do use it extensively in this
30 # module
31
32 import os
33 import sys
34 import logging
35 import signal
36 import codecs
37
38 from optparse import OptionParser
39
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti.storage import container
49 from ganeti import serializer
50 from ganeti import netutils
51 from ganeti import pathutils
52 from ganeti import ssconf
53
54 import ganeti.http.server # pylint: disable=W0611
55
56
57 queue_lock = None
58
59
60 def _extendReasonTrail(trail, source, reason=""):
61   """Extend the reason trail with noded information
62
63   The trail is extended by appending the name of the noded functionality
64   """
65   assert trail is not None
66   trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67   trail.append((trail_source, reason, utils.EpochNano()))
68
69
70 def _PrepareQueueLock():
71   """Try to prepare the queue lock.
72
73   @return: None for success, otherwise an exception object
74
75   """
76   global queue_lock # pylint: disable=W0603
77
78   if queue_lock is not None:
79     return None
80
81   # Prepare job queue
82   try:
83     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84     return None
85   except EnvironmentError, err:
86     return err
87
88
89 def _RequireJobQueueLock(fn):
90   """Decorator for job queue manipulating functions.
91
92   """
93   QUEUE_LOCK_TIMEOUT = 10
94
95   def wrapper(*args, **kwargs):
96     # Locking in exclusive, blocking mode because there could be several
97     # children running at the same time. Waiting up to 10 seconds.
98     if _PrepareQueueLock() is not None:
99       raise errors.JobQueueError("Job queue failed initialization,"
100                                  " cannot update jobs")
101     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102     try:
103       return fn(*args, **kwargs)
104     finally:
105       queue_lock.Unlock()
106
107   return wrapper
108
109
110 def _DecodeImportExportIO(ieio, ieioargs):
111   """Decodes import/export I/O information.
112
113   """
114   if ieio == constants.IEIO_RAW_DISK:
115     assert len(ieioargs) == 1
116     return (objects.Disk.FromDict(ieioargs[0]), )
117
118   if ieio == constants.IEIO_SCRIPT:
119     assert len(ieioargs) == 2
120     return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121
122   return ieioargs
123
124
125 def _DefaultAlternative(value, default):
126   """Returns value or, if evaluating to False, a default value.
127
128   Returns the given value, unless it evaluates to False. In the latter case the
129   default value is returned.
130
131   @param value: Value to return if it doesn't evaluate to False
132   @param default: Default value
133   @return: Given value or the default
134
135   """
136   if value:
137     return value
138
139   return default
140
141
142 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143   """Subclass ensuring request handlers are locked in RAM.
144
145   """
146   def __init__(self, *args, **kwargs):
147     utils.Mlockall()
148
149     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150
151
152 class NodeRequestHandler(http.server.HttpServerHandler):
153   """The server implementation.
154
155   This class holds all methods exposed over the RPC interface.
156
157   """
158   # too many public methods, and unused args - all methods get params
159   # due to the API
160   # pylint: disable=R0904,W0613
161   def __init__(self):
162     http.server.HttpServerHandler.__init__(self)
163     self.noded_pid = os.getpid()
164
165   def HandleRequest(self, req):
166     """Handle a request.
167
168     """
169     if req.request_method.upper() != http.HTTP_POST:
170       raise http.HttpBadRequest("Only the POST method is supported")
171
172     path = req.request_path
173     if path.startswith("/"):
174       path = path[1:]
175
176     method = getattr(self, "perspective_%s" % path, None)
177     if method is None:
178       raise http.HttpNotFound()
179
180     try:
181       result = (True, method(serializer.LoadJson(req.request_body)))
182
183     except backend.RPCFail, err:
184       # our custom failure exception; str(err) works fine if the
185       # exception was constructed with a single argument, and in
186       # this case, err.message == err.args[0] == str(err)
187       result = (False, str(err))
188     except errors.QuitGanetiException, err:
189       # Tell parent to quit
190       logging.info("Shutting down the node daemon, arguments: %s",
191                    str(err.args))
192       os.kill(self.noded_pid, signal.SIGTERM)
193       # And return the error's arguments, which must be already in
194       # correct tuple format
195       result = err.args
196     except Exception, err:
197       logging.exception("Error in RPC call")
198       result = (False, "Error while executing backend function: %s" % str(err))
199
200     return serializer.DumpJson(result)
201
202   # the new block devices  --------------------------
203
204   @staticmethod
205   def perspective_blockdev_create(params):
206     """Create a block device.
207
208     """
209     (bdev_s, size, owner, on_primary, info, excl_stor) = params
210     bdev = objects.Disk.FromDict(bdev_s)
211     if bdev is None:
212       raise ValueError("can't unserialize data!")
213     return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
214                                   excl_stor)
215
216   @staticmethod
217   def perspective_blockdev_pause_resume_sync(params):
218     """Pause/resume sync of a block device.
219
220     """
221     disks_s, pause = params
222     disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
223     return backend.BlockdevPauseResumeSync(disks, pause)
224
225   @staticmethod
226   def perspective_blockdev_wipe(params):
227     """Wipe a block device.
228
229     """
230     bdev_s, offset, size = params
231     bdev = objects.Disk.FromDict(bdev_s)
232     return backend.BlockdevWipe(bdev, offset, size)
233
234   @staticmethod
235   def perspective_blockdev_remove(params):
236     """Remove a block device.
237
238     """
239     bdev_s = params[0]
240     bdev = objects.Disk.FromDict(bdev_s)
241     return backend.BlockdevRemove(bdev)
242
243   @staticmethod
244   def perspective_blockdev_rename(params):
245     """Remove a block device.
246
247     """
248     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
249     return backend.BlockdevRename(devlist)
250
251   @staticmethod
252   def perspective_blockdev_assemble(params):
253     """Assemble a block device.
254
255     """
256     bdev_s, owner, on_primary, idx = params
257     bdev = objects.Disk.FromDict(bdev_s)
258     if bdev is None:
259       raise ValueError("can't unserialize data!")
260     return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
261
262   @staticmethod
263   def perspective_blockdev_shutdown(params):
264     """Shutdown a block device.
265
266     """
267     bdev_s = params[0]
268     bdev = objects.Disk.FromDict(bdev_s)
269     if bdev is None:
270       raise ValueError("can't unserialize data!")
271     return backend.BlockdevShutdown(bdev)
272
273   @staticmethod
274   def perspective_blockdev_addchildren(params):
275     """Add a child to a mirror device.
276
277     Note: this is only valid for mirror devices. It's the caller's duty
278     to send a correct disk, otherwise we raise an error.
279
280     """
281     bdev_s, ndev_s = params
282     bdev = objects.Disk.FromDict(bdev_s)
283     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
284     if bdev is None or ndevs.count(None) > 0:
285       raise ValueError("can't unserialize data!")
286     return backend.BlockdevAddchildren(bdev, ndevs)
287
288   @staticmethod
289   def perspective_blockdev_removechildren(params):
290     """Remove a child from a mirror device.
291
292     This is only valid for mirror devices, of course. It's the callers
293     duty to send a correct disk, otherwise we raise an error.
294
295     """
296     bdev_s, ndev_s = params
297     bdev = objects.Disk.FromDict(bdev_s)
298     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
299     if bdev is None or ndevs.count(None) > 0:
300       raise ValueError("can't unserialize data!")
301     return backend.BlockdevRemovechildren(bdev, ndevs)
302
303   @staticmethod
304   def perspective_blockdev_getmirrorstatus(params):
305     """Return the mirror status for a list of disks.
306
307     """
308     disks = [objects.Disk.FromDict(dsk_s)
309              for dsk_s in params[0]]
310     return [status.ToDict()
311             for status in backend.BlockdevGetmirrorstatus(disks)]
312
313   @staticmethod
314   def perspective_blockdev_getmirrorstatus_multi(params):
315     """Return the mirror status for a list of disks.
316
317     """
318     (node_disks, ) = params
319
320     disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
321
322     result = []
323
324     for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
325       if success:
326         result.append((success, status.ToDict()))
327       else:
328         result.append((success, status))
329
330     return result
331
332   @staticmethod
333   def perspective_blockdev_find(params):
334     """Expose the FindBlockDevice functionality for a disk.
335
336     This will try to find but not activate a disk.
337
338     """
339     disk = objects.Disk.FromDict(params[0])
340
341     result = backend.BlockdevFind(disk)
342     if result is None:
343       return None
344
345     return result.ToDict()
346
347   @staticmethod
348   def perspective_blockdev_snapshot(params):
349     """Create a snapshot device.
350
351     Note that this is only valid for LVM disks, if we get passed
352     something else we raise an exception. The snapshot device can be
353     remove by calling the generic block device remove call.
354
355     """
356     cfbd = objects.Disk.FromDict(params[0])
357     return backend.BlockdevSnapshot(cfbd)
358
359   @staticmethod
360   def perspective_blockdev_grow(params):
361     """Grow a stack of devices.
362
363     """
364     if len(params) < 5:
365       raise ValueError("Received only %s parameters in blockdev_grow,"
366                        " old master?" % len(params))
367     cfbd = objects.Disk.FromDict(params[0])
368     amount = params[1]
369     dryrun = params[2]
370     backingstore = params[3]
371     excl_stor = params[4]
372     return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
373
374   @staticmethod
375   def perspective_blockdev_close(params):
376     """Closes the given block devices.
377
378     """
379     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
380     return backend.BlockdevClose(params[0], disks)
381
382   @staticmethod
383   def perspective_blockdev_getdimensions(params):
384     """Compute the sizes of the given block devices.
385
386     """
387     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
388     return backend.BlockdevGetdimensions(disks)
389
390   @staticmethod
391   def perspective_blockdev_export(params):
392     """Compute the sizes of the given block devices.
393
394     """
395     disk = objects.Disk.FromDict(params[0])
396     dest_node, dest_path, cluster_name = params[1:]
397     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
398
399   @staticmethod
400   def perspective_blockdev_setinfo(params):
401     """Sets metadata information on the given block device.
402
403     """
404     (disk, info) = params
405     disk = objects.Disk.FromDict(disk)
406     return backend.BlockdevSetInfo(disk, info)
407
408   # blockdev/drbd specific methods ----------
409
410   @staticmethod
411   def perspective_drbd_disconnect_net(params):
412     """Disconnects the network connection of drbd disks.
413
414     Note that this is only valid for drbd disks, so the members of the
415     disk list must all be drbd devices.
416
417     """
418     nodes_ip, disks, target_node_uuid = params
419     disks = [objects.Disk.FromDict(cf) for cf in disks]
420     return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
421
422   @staticmethod
423   def perspective_drbd_attach_net(params):
424     """Attaches the network connection of drbd disks.
425
426     Note that this is only valid for drbd disks, so the members of the
427     disk list must all be drbd devices.
428
429     """
430     nodes_ip, disks, instance_name, multimaster, target_node_uuid = params
431     disks = [objects.Disk.FromDict(cf) for cf in disks]
432     return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks,
433                                  instance_name, multimaster)
434
435   @staticmethod
436   def perspective_drbd_wait_sync(params):
437     """Wait until DRBD disks are synched.
438
439     Note that this is only valid for drbd disks, so the members of the
440     disk list must all be drbd devices.
441
442     """
443     nodes_ip, disks, target_node_uuid = params
444     disks = [objects.Disk.FromDict(cf) for cf in disks]
445     return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
446
447   @staticmethod
448   def perspective_drbd_needs_activation(params):
449     """Checks if the drbd devices need activation
450
451     Note that this is only valid for drbd disks, so the members of the
452     disk list must all be drbd devices.
453
454     """
455     nodes_ip, disks, target_node_uuid = params
456     disks = [objects.Disk.FromDict(cf) for cf in disks]
457     return backend.DrbdNeedsActivation(target_node_uuid, nodes_ip, disks)
458
459   @staticmethod
460   def perspective_drbd_helper(params):
461     """Query drbd helper.
462
463     """
464     return backend.GetDrbdUsermodeHelper()
465
466   # export/import  --------------------------
467
468   @staticmethod
469   def perspective_finalize_export(params):
470     """Expose the finalize export functionality.
471
472     """
473     instance = objects.Instance.FromDict(params[0])
474
475     snap_disks = []
476     for disk in params[1]:
477       if isinstance(disk, bool):
478         snap_disks.append(disk)
479       else:
480         snap_disks.append(objects.Disk.FromDict(disk))
481
482     return backend.FinalizeExport(instance, snap_disks)
483
484   @staticmethod
485   def perspective_export_info(params):
486     """Query information about an existing export on this node.
487
488     The given path may not contain an export, in which case we return
489     None.
490
491     """
492     path = params[0]
493     return backend.ExportInfo(path)
494
495   @staticmethod
496   def perspective_export_list(params):
497     """List the available exports on this node.
498
499     Note that as opposed to export_info, which may query data about an
500     export in any path, this only queries the standard Ganeti path
501     (pathutils.EXPORT_DIR).
502
503     """
504     return backend.ListExports()
505
506   @staticmethod
507   def perspective_export_remove(params):
508     """Remove an export.
509
510     """
511     export = params[0]
512     return backend.RemoveExport(export)
513
514   # block device ---------------------
515   @staticmethod
516   def perspective_bdev_sizes(params):
517     """Query the list of block devices
518
519     """
520     devices = params[0]
521     return backend.GetBlockDevSizes(devices)
522
523   # volume  --------------------------
524
525   @staticmethod
526   def perspective_lv_list(params):
527     """Query the list of logical volumes in a given volume group.
528
529     """
530     vgname = params[0]
531     return backend.GetVolumeList(vgname)
532
533   @staticmethod
534   def perspective_vg_list(params):
535     """Query the list of volume groups.
536
537     """
538     return backend.ListVolumeGroups()
539
540   # Storage --------------------------
541
542   @staticmethod
543   def perspective_storage_list(params):
544     """Get list of storage units.
545
546     """
547     (su_name, su_args, name, fields) = params
548     return container.GetStorage(su_name, *su_args).List(name, fields)
549
550   @staticmethod
551   def perspective_storage_modify(params):
552     """Modify a storage unit.
553
554     """
555     (su_name, su_args, name, changes) = params
556     return container.GetStorage(su_name, *su_args).Modify(name, changes)
557
558   @staticmethod
559   def perspective_storage_execute(params):
560     """Execute an operation on a storage unit.
561
562     """
563     (su_name, su_args, name, op) = params
564     return container.GetStorage(su_name, *su_args).Execute(name, op)
565
566   # bridge  --------------------------
567
568   @staticmethod
569   def perspective_bridges_exist(params):
570     """Check if all bridges given exist on this node.
571
572     """
573     bridges_list = params[0]
574     return backend.BridgesExist(bridges_list)
575
576   # instance  --------------------------
577
578   @staticmethod
579   def perspective_instance_os_add(params):
580     """Install an OS on a given instance.
581
582     """
583     inst_s = params[0]
584     inst = objects.Instance.FromDict(inst_s)
585     reinstall = params[1]
586     debug = params[2]
587     return backend.InstanceOsAdd(inst, reinstall, debug)
588
589   @staticmethod
590   def perspective_instance_run_rename(params):
591     """Runs the OS rename script for an instance.
592
593     """
594     inst_s, old_name, debug = params
595     inst = objects.Instance.FromDict(inst_s)
596     return backend.RunRenameInstance(inst, old_name, debug)
597
598   @staticmethod
599   def perspective_instance_shutdown(params):
600     """Shutdown an instance.
601
602     """
603     instance = objects.Instance.FromDict(params[0])
604     timeout = params[1]
605     trail = params[2]
606     _extendReasonTrail(trail, "shutdown")
607     return backend.InstanceShutdown(instance, timeout, trail)
608
609   @staticmethod
610   def perspective_instance_start(params):
611     """Start an instance.
612
613     """
614     (instance_name, startup_paused, trail) = params
615     instance = objects.Instance.FromDict(instance_name)
616     _extendReasonTrail(trail, "start")
617     return backend.StartInstance(instance, startup_paused, trail)
618
619   @staticmethod
620   def perspective_migration_info(params):
621     """Gather information about an instance to be migrated.
622
623     """
624     instance = objects.Instance.FromDict(params[0])
625     return backend.MigrationInfo(instance)
626
627   @staticmethod
628   def perspective_accept_instance(params):
629     """Prepare the node to accept an instance.
630
631     """
632     instance, info, target = params
633     instance = objects.Instance.FromDict(instance)
634     return backend.AcceptInstance(instance, info, target)
635
636   @staticmethod
637   def perspective_instance_finalize_migration_dst(params):
638     """Finalize the instance migration on the destination node.
639
640     """
641     instance, info, success = params
642     instance = objects.Instance.FromDict(instance)
643     return backend.FinalizeMigrationDst(instance, info, success)
644
645   @staticmethod
646   def perspective_instance_migrate(params):
647     """Migrates an instance.
648
649     """
650     cluster_name, instance, target, live = params
651     instance = objects.Instance.FromDict(instance)
652     return backend.MigrateInstance(cluster_name, instance, target, live)
653
654   @staticmethod
655   def perspective_instance_finalize_migration_src(params):
656     """Finalize the instance migration on the source node.
657
658     """
659     instance, success, live = params
660     instance = objects.Instance.FromDict(instance)
661     return backend.FinalizeMigrationSource(instance, success, live)
662
663   @staticmethod
664   def perspective_instance_get_migration_status(params):
665     """Reports migration status.
666
667     """
668     instance = objects.Instance.FromDict(params[0])
669     return backend.GetMigrationStatus(instance).ToDict()
670
671   @staticmethod
672   def perspective_instance_reboot(params):
673     """Reboot an instance.
674
675     """
676     instance = objects.Instance.FromDict(params[0])
677     reboot_type = params[1]
678     shutdown_timeout = params[2]
679     trail = params[3]
680     _extendReasonTrail(trail, "reboot")
681     return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
682                                   trail)
683
684   @staticmethod
685   def perspective_instance_balloon_memory(params):
686     """Modify instance runtime memory.
687
688     """
689     instance_dict, memory = params
690     instance = objects.Instance.FromDict(instance_dict)
691     return backend.InstanceBalloonMemory(instance, memory)
692
693   @staticmethod
694   def perspective_instance_info(params):
695     """Query instance information.
696
697     """
698     (instance_name, hypervisor_name, hvparams) = params
699     return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
700
701   @staticmethod
702   def perspective_instance_migratable(params):
703     """Query whether the specified instance can be migrated.
704
705     """
706     instance = objects.Instance.FromDict(params[0])
707     return backend.GetInstanceMigratable(instance)
708
709   @staticmethod
710   def perspective_all_instances_info(params):
711     """Query information about all instances.
712
713     """
714     (hypervisor_list, all_hvparams) = params
715     return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
716
717   @staticmethod
718   def perspective_instance_list(params):
719     """Query the list of running instances.
720
721     """
722     (hypervisor_list, hvparams) = params
723     return backend.GetInstanceList(hypervisor_list, hvparams)
724
725   # node --------------------------
726
727   @staticmethod
728   def perspective_node_has_ip_address(params):
729     """Checks if a node has the given ip address.
730
731     """
732     return netutils.IPAddress.Own(params[0])
733
734   @staticmethod
735   def perspective_node_info(params):
736     """Query node information.
737
738     """
739     (storage_units, hv_specs) = params
740     return backend.GetNodeInfo(storage_units, hv_specs)
741
742   @staticmethod
743   def perspective_etc_hosts_modify(params):
744     """Modify a node entry in /etc/hosts.
745
746     """
747     backend.EtcHostsModify(params[0], params[1], params[2])
748
749     return True
750
751   @staticmethod
752   def perspective_node_verify(params):
753     """Run a verify sequence on this node.
754
755     """
756     (what, cluster_name, hvparams) = params
757     return backend.VerifyNode(what, cluster_name, hvparams)
758
759   @classmethod
760   def perspective_node_verify_light(cls, params):
761     """Run a light verify sequence on this node.
762
763     """
764     # So far it's the same as the normal node_verify
765     return cls.perspective_node_verify(params)
766
767   @staticmethod
768   def perspective_node_start_master_daemons(params):
769     """Start the master daemons on this node.
770
771     """
772     return backend.StartMasterDaemons(params[0])
773
774   @staticmethod
775   def perspective_node_activate_master_ip(params):
776     """Activate the master IP on this node.
777
778     """
779     master_params = objects.MasterNetworkParameters.FromDict(params[0])
780     return backend.ActivateMasterIp(master_params, params[1])
781
782   @staticmethod
783   def perspective_node_deactivate_master_ip(params):
784     """Deactivate the master IP on this node.
785
786     """
787     master_params = objects.MasterNetworkParameters.FromDict(params[0])
788     return backend.DeactivateMasterIp(master_params, params[1])
789
790   @staticmethod
791   def perspective_node_stop_master(params):
792     """Stops master daemons on this node.
793
794     """
795     return backend.StopMasterDaemons()
796
797   @staticmethod
798   def perspective_node_change_master_netmask(params):
799     """Change the master IP netmask.
800
801     """
802     return backend.ChangeMasterNetmask(params[0], params[1], params[2],
803                                        params[3])
804
805   @staticmethod
806   def perspective_node_leave_cluster(params):
807     """Cleanup after leaving a cluster.
808
809     """
810     return backend.LeaveCluster(params[0])
811
812   @staticmethod
813   def perspective_node_volumes(params):
814     """Query the list of all logical volume groups.
815
816     """
817     return backend.NodeVolumes()
818
819   @staticmethod
820   def perspective_node_demote_from_mc(params):
821     """Demote a node from the master candidate role.
822
823     """
824     return backend.DemoteFromMC()
825
826   @staticmethod
827   def perspective_node_powercycle(params):
828     """Tries to powercycle the nod.
829
830     """
831     (hypervisor_type, hvparams) = params
832     return backend.PowercycleNode(hypervisor_type, hvparams)
833
834   # cluster --------------------------
835
836   @staticmethod
837   def perspective_version(params):
838     """Query version information.
839
840     """
841     return constants.PROTOCOL_VERSION
842
843   @staticmethod
844   def perspective_upload_file(params):
845     """Upload a file.
846
847     Note that the backend implementation imposes strict rules on which
848     files are accepted.
849
850     """
851     return backend.UploadFile(*(params[0]))
852
853   @staticmethod
854   def perspective_master_info(params):
855     """Query master information.
856
857     """
858     return backend.GetMasterInfo()
859
860   @staticmethod
861   def perspective_run_oob(params):
862     """Runs oob on node.
863
864     """
865     output = backend.RunOob(params[0], params[1], params[2], params[3])
866     if output:
867       result = serializer.LoadJson(output)
868     else:
869       result = None
870     return result
871
872   @staticmethod
873   def perspective_restricted_command(params):
874     """Runs a restricted command.
875
876     """
877     (cmd, ) = params
878
879     return backend.RunRestrictedCmd(cmd)
880
881   @staticmethod
882   def perspective_write_ssconf_files(params):
883     """Write ssconf files.
884
885     """
886     (values,) = params
887     return ssconf.WriteSsconfFiles(values)
888
889   @staticmethod
890   def perspective_get_watcher_pause(params):
891     """Get watcher pause end.
892
893     """
894     return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
895
896   @staticmethod
897   def perspective_set_watcher_pause(params):
898     """Set watcher pause.
899
900     """
901     (until, ) = params
902     return backend.SetWatcherPause(until)
903
904   # os -----------------------
905
906   @staticmethod
907   def perspective_os_diagnose(params):
908     """Query detailed information about existing OSes.
909
910     """
911     return backend.DiagnoseOS()
912
913   @staticmethod
914   def perspective_os_get(params):
915     """Query information about a given OS.
916
917     """
918     name = params[0]
919     os_obj = backend.OSFromDisk(name)
920     return os_obj.ToDict()
921
922   @staticmethod
923   def perspective_os_validate(params):
924     """Run a given OS' validation routine.
925
926     """
927     required, name, checks, params = params
928     return backend.ValidateOS(required, name, checks, params)
929
930   # extstorage -----------------------
931
932   @staticmethod
933   def perspective_extstorage_diagnose(params):
934     """Query detailed information about existing extstorage providers.
935
936     """
937     return backend.DiagnoseExtStorage()
938
939   # hooks -----------------------
940
941   @staticmethod
942   def perspective_hooks_runner(params):
943     """Run hook scripts.
944
945     """
946     hpath, phase, env = params
947     hr = backend.HooksRunner()
948     return hr.RunHooks(hpath, phase, env)
949
950   # iallocator -----------------
951
952   @staticmethod
953   def perspective_iallocator_runner(params):
954     """Run an iallocator script.
955
956     """
957     name, idata = params
958     iar = backend.IAllocatorRunner()
959     return iar.Run(name, idata)
960
961   # test -----------------------
962
963   @staticmethod
964   def perspective_test_delay(params):
965     """Run test delay.
966
967     """
968     duration = params[0]
969     status, rval = utils.TestDelay(duration)
970     if not status:
971       raise backend.RPCFail(rval)
972     return rval
973
974   # file storage ---------------
975
976   @staticmethod
977   def perspective_file_storage_dir_create(params):
978     """Create the file storage directory.
979
980     """
981     file_storage_dir = params[0]
982     return backend.CreateFileStorageDir(file_storage_dir)
983
984   @staticmethod
985   def perspective_file_storage_dir_remove(params):
986     """Remove the file storage directory.
987
988     """
989     file_storage_dir = params[0]
990     return backend.RemoveFileStorageDir(file_storage_dir)
991
992   @staticmethod
993   def perspective_file_storage_dir_rename(params):
994     """Rename the file storage directory.
995
996     """
997     old_file_storage_dir = params[0]
998     new_file_storage_dir = params[1]
999     return backend.RenameFileStorageDir(old_file_storage_dir,
1000                                         new_file_storage_dir)
1001
1002   # jobs ------------------------
1003
1004   @staticmethod
1005   @_RequireJobQueueLock
1006   def perspective_jobqueue_update(params):
1007     """Update job queue.
1008
1009     """
1010     (file_name, content) = params
1011     return backend.JobQueueUpdate(file_name, content)
1012
1013   @staticmethod
1014   @_RequireJobQueueLock
1015   def perspective_jobqueue_purge(params):
1016     """Purge job queue.
1017
1018     """
1019     return backend.JobQueuePurge()
1020
1021   @staticmethod
1022   @_RequireJobQueueLock
1023   def perspective_jobqueue_rename(params):
1024     """Rename a job queue file.
1025
1026     """
1027     # TODO: What if a file fails to rename?
1028     return [backend.JobQueueRename(old, new) for old, new in params[0]]
1029
1030   @staticmethod
1031   @_RequireJobQueueLock
1032   def perspective_jobqueue_set_drain_flag(params):
1033     """Set job queue's drain flag.
1034
1035     """
1036     (flag, ) = params
1037
1038     return jstore.SetDrainFlag(flag)
1039
1040   # hypervisor ---------------
1041
1042   @staticmethod
1043   def perspective_hypervisor_validate_params(params):
1044     """Validate the hypervisor parameters.
1045
1046     """
1047     (hvname, hvparams) = params
1048     return backend.ValidateHVParams(hvname, hvparams)
1049
1050   # Crypto
1051
1052   @staticmethod
1053   def perspective_x509_cert_create(params):
1054     """Creates a new X509 certificate for SSL/TLS.
1055
1056     """
1057     (validity, ) = params
1058     return backend.CreateX509Certificate(validity)
1059
1060   @staticmethod
1061   def perspective_x509_cert_remove(params):
1062     """Removes a X509 certificate.
1063
1064     """
1065     (name, ) = params
1066     return backend.RemoveX509Certificate(name)
1067
1068   # Import and export
1069
1070   @staticmethod
1071   def perspective_import_start(params):
1072     """Starts an import daemon.
1073
1074     """
1075     (opts_s, instance, component, (dest, dest_args)) = params
1076
1077     opts = objects.ImportExportOptions.FromDict(opts_s)
1078
1079     return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1080                                            None, None,
1081                                            objects.Instance.FromDict(instance),
1082                                            component, dest,
1083                                            _DecodeImportExportIO(dest,
1084                                                                  dest_args))
1085
1086   @staticmethod
1087   def perspective_export_start(params):
1088     """Starts an export daemon.
1089
1090     """
1091     (opts_s, host, port, instance, component, (source, source_args)) = params
1092
1093     opts = objects.ImportExportOptions.FromDict(opts_s)
1094
1095     return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1096                                            host, port,
1097                                            objects.Instance.FromDict(instance),
1098                                            component, source,
1099                                            _DecodeImportExportIO(source,
1100                                                                  source_args))
1101
1102   @staticmethod
1103   def perspective_impexp_status(params):
1104     """Retrieves the status of an import or export daemon.
1105
1106     """
1107     return backend.GetImportExportStatus(params[0])
1108
1109   @staticmethod
1110   def perspective_impexp_abort(params):
1111     """Aborts an import or export.
1112
1113     """
1114     return backend.AbortImportExport(params[0])
1115
1116   @staticmethod
1117   def perspective_impexp_cleanup(params):
1118     """Cleans up after an import or export.
1119
1120     """
1121     return backend.CleanupImportExport(params[0])
1122
1123
1124 def CheckNoded(_, args):
1125   """Initial checks whether to run or exit with a failure.
1126
1127   """
1128   if args: # noded doesn't take any arguments
1129     print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1130                           sys.argv[0])
1131     sys.exit(constants.EXIT_FAILURE)
1132   try:
1133     codecs.lookup("string-escape")
1134   except LookupError:
1135     print >> sys.stderr, ("Can't load the string-escape code which is part"
1136                           " of the Python installation. Is your installation"
1137                           " complete/correct? Aborting.")
1138     sys.exit(constants.EXIT_FAILURE)
1139
1140
1141 def PrepNoded(options, _):
1142   """Preparation node daemon function, executed with the PID file held.
1143
1144   """
1145   if options.mlock:
1146     request_executor_class = MlockallRequestExecutor
1147     try:
1148       utils.Mlockall()
1149     except errors.NoCtypesError:
1150       logging.warning("Cannot set memory lock, ctypes module not found")
1151       request_executor_class = http.server.HttpServerRequestExecutor
1152   else:
1153     request_executor_class = http.server.HttpServerRequestExecutor
1154
1155   # Read SSL certificate
1156   if options.ssl:
1157     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1158                                     ssl_cert_path=options.ssl_cert)
1159   else:
1160     ssl_params = None
1161
1162   err = _PrepareQueueLock()
1163   if err is not None:
1164     # this might be some kind of file-system/permission error; while
1165     # this breaks the job queue functionality, we shouldn't prevent
1166     # startup of the whole node daemon because of this
1167     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1168
1169   handler = NodeRequestHandler()
1170
1171   mainloop = daemon.Mainloop()
1172   server = \
1173     http.server.HttpServer(mainloop, options.bind_address, options.port,
1174                            handler, ssl_params=ssl_params, ssl_verify_peer=True,
1175                            request_executor_class=request_executor_class)
1176   server.Start()
1177
1178   return (mainloop, server)
1179
1180
1181 def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1182   """Main node daemon function, executed with the PID file held.
1183
1184   """
1185   (mainloop, server) = prep_data
1186   try:
1187     mainloop.Run()
1188   finally:
1189     server.Stop()
1190
1191
1192 def Main():
1193   """Main function for the node daemon.
1194
1195   """
1196   parser = OptionParser(description="Ganeti node daemon",
1197                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\
1198                               \ [-i INTERFACE]",
1199                         version="%%prog (ganeti) %s" %
1200                         constants.RELEASE_VERSION)
1201   parser.add_option("--no-mlock", dest="mlock",
1202                     help="Do not mlock the node memory in ram",
1203                     default=True, action="store_false")
1204
1205   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1206                      default_ssl_cert=pathutils.NODED_CERT_FILE,
1207                      default_ssl_key=pathutils.NODED_CERT_FILE,
1208                      console_logging=True)