Handle None result from BlockdevFind
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45 from ganeti import storage
46
47 import ganeti.http.server
48
49
50 queue_lock = None
51
52
53 def _RequireJobQueueLock(fn):
54   """Decorator for job queue manipulating functions.
55
56   """
57   QUEUE_LOCK_TIMEOUT = 10
58
59   def wrapper(*args, **kwargs):
60     # Locking in exclusive, blocking mode because there could be several
61     # children running at the same time. Waiting up to 10 seconds.
62     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63     try:
64       return fn(*args, **kwargs)
65     finally:
66       queue_lock.Unlock()
67
68   return wrapper
69
70
71 class NodeHttpServer(http.server.HttpServer):
72   """The server implementation.
73
74   This class holds all methods exposed over the RPC interface.
75
76   """
77   def __init__(self, *args, **kwargs):
78     http.server.HttpServer.__init__(self, *args, **kwargs)
79     self.noded_pid = os.getpid()
80
81   def HandleRequest(self, req):
82     """Handle a request.
83
84     """
85     if req.request_method.upper() != http.HTTP_PUT:
86       raise http.HttpBadRequest()
87
88     path = req.request_path
89     if path.startswith("/"):
90       path = path[1:]
91
92     method = getattr(self, "perspective_%s" % path, None)
93     if method is None:
94       raise http.HttpNotFound()
95
96     try:
97       rvalue = method(req.request_body)
98       return True, rvalue
99
100     except backend.RPCFail, err:
101       # our custom failure exception; str(err) works fine if the
102       # exception was constructed with a single argument, and in
103       # this case, err.message == err.args[0] == str(err)
104       return (False, str(err))
105     except errors.QuitGanetiException, err:
106       # Tell parent to quit
107       logging.info("Shutting down the node daemon, arguments: %s",
108                    str(err.args))
109       os.kill(self.noded_pid, signal.SIGTERM)
110       # And return the error's arguments, which must be already in
111       # correct tuple format
112       return err.args
113     except Exception, err:
114       logging.exception("Error in RPC call")
115       return False, "Error while executing backend function: %s" % str(err)
116
117   # the new block devices  --------------------------
118
119   @staticmethod
120   def perspective_blockdev_create(params):
121     """Create a block device.
122
123     """
124     bdev_s, size, owner, on_primary, info = params
125     bdev = objects.Disk.FromDict(bdev_s)
126     if bdev is None:
127       raise ValueError("can't unserialize data!")
128     return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
129
130   @staticmethod
131   def perspective_blockdev_remove(params):
132     """Remove a block device.
133
134     """
135     bdev_s = params[0]
136     bdev = objects.Disk.FromDict(bdev_s)
137     return backend.BlockdevRemove(bdev)
138
139   @staticmethod
140   def perspective_blockdev_rename(params):
141     """Remove a block device.
142
143     """
144     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
145     return backend.BlockdevRename(devlist)
146
147   @staticmethod
148   def perspective_blockdev_assemble(params):
149     """Assemble a block device.
150
151     """
152     bdev_s, owner, on_primary = params
153     bdev = objects.Disk.FromDict(bdev_s)
154     if bdev is None:
155       raise ValueError("can't unserialize data!")
156     return backend.BlockdevAssemble(bdev, owner, on_primary)
157
158   @staticmethod
159   def perspective_blockdev_shutdown(params):
160     """Shutdown a block device.
161
162     """
163     bdev_s = params[0]
164     bdev = objects.Disk.FromDict(bdev_s)
165     if bdev is None:
166       raise ValueError("can't unserialize data!")
167     return backend.BlockdevShutdown(bdev)
168
169   @staticmethod
170   def perspective_blockdev_addchildren(params):
171     """Add a child to a mirror device.
172
173     Note: this is only valid for mirror devices. It's the caller's duty
174     to send a correct disk, otherwise we raise an error.
175
176     """
177     bdev_s, ndev_s = params
178     bdev = objects.Disk.FromDict(bdev_s)
179     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180     if bdev is None or ndevs.count(None) > 0:
181       raise ValueError("can't unserialize data!")
182     return backend.BlockdevAddchildren(bdev, ndevs)
183
184   @staticmethod
185   def perspective_blockdev_removechildren(params):
186     """Remove a child from a mirror device.
187
188     This is only valid for mirror devices, of course. It's the callers
189     duty to send a correct disk, otherwise we raise an error.
190
191     """
192     bdev_s, ndev_s = params
193     bdev = objects.Disk.FromDict(bdev_s)
194     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
195     if bdev is None or ndevs.count(None) > 0:
196       raise ValueError("can't unserialize data!")
197     return backend.BlockdevRemovechildren(bdev, ndevs)
198
199   @staticmethod
200   def perspective_blockdev_getmirrorstatus(params):
201     """Return the mirror status for a list of disks.
202
203     """
204     disks = [objects.Disk.FromDict(dsk_s)
205              for dsk_s in params]
206     return [status.ToDict()
207             for status in backend.BlockdevGetmirrorstatus(disks)]
208
209   @staticmethod
210   def perspective_blockdev_find(params):
211     """Expose the FindBlockDevice functionality for a disk.
212
213     This will try to find but not activate a disk.
214
215     """
216     disk = objects.Disk.FromDict(params[0])
217
218     result = backend.BlockdevFind(disk)
219     if result is None:
220       return None
221
222     return result.ToDict()
223
224   @staticmethod
225   def perspective_blockdev_snapshot(params):
226     """Create a snapshot device.
227
228     Note that this is only valid for LVM disks, if we get passed
229     something else we raise an exception. The snapshot device can be
230     remove by calling the generic block device remove call.
231
232     """
233     cfbd = objects.Disk.FromDict(params[0])
234     return backend.BlockdevSnapshot(cfbd)
235
236   @staticmethod
237   def perspective_blockdev_grow(params):
238     """Grow a stack of devices.
239
240     """
241     cfbd = objects.Disk.FromDict(params[0])
242     amount = params[1]
243     return backend.BlockdevGrow(cfbd, amount)
244
245   @staticmethod
246   def perspective_blockdev_close(params):
247     """Closes the given block devices.
248
249     """
250     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
251     return backend.BlockdevClose(params[0], disks)
252
253   # blockdev/drbd specific methods ----------
254
255   @staticmethod
256   def perspective_drbd_disconnect_net(params):
257     """Disconnects the network connection of drbd disks.
258
259     Note that this is only valid for drbd disks, so the members of the
260     disk list must all be drbd devices.
261
262     """
263     nodes_ip, disks = params
264     disks = [objects.Disk.FromDict(cf) for cf in disks]
265     return backend.DrbdDisconnectNet(nodes_ip, disks)
266
267   @staticmethod
268   def perspective_drbd_attach_net(params):
269     """Attaches the network connection of drbd disks.
270
271     Note that this is only valid for drbd disks, so the members of the
272     disk list must all be drbd devices.
273
274     """
275     nodes_ip, disks, instance_name, multimaster = params
276     disks = [objects.Disk.FromDict(cf) for cf in disks]
277     return backend.DrbdAttachNet(nodes_ip, disks,
278                                      instance_name, multimaster)
279
280   @staticmethod
281   def perspective_drbd_wait_sync(params):
282     """Wait until DRBD disks are synched.
283
284     Note that this is only valid for drbd disks, so the members of the
285     disk list must all be drbd devices.
286
287     """
288     nodes_ip, disks = params
289     disks = [objects.Disk.FromDict(cf) for cf in disks]
290     return backend.DrbdWaitSync(nodes_ip, disks)
291
292   # export/import  --------------------------
293
294   @staticmethod
295   def perspective_snapshot_export(params):
296     """Export a given snapshot.
297
298     """
299     disk = objects.Disk.FromDict(params[0])
300     dest_node = params[1]
301     instance = objects.Instance.FromDict(params[2])
302     cluster_name = params[3]
303     dev_idx = params[4]
304     return backend.ExportSnapshot(disk, dest_node, instance,
305                                   cluster_name, dev_idx)
306
307   @staticmethod
308   def perspective_finalize_export(params):
309     """Expose the finalize export functionality.
310
311     """
312     instance = objects.Instance.FromDict(params[0])
313     snap_disks = [objects.Disk.FromDict(str_data)
314                   for str_data in params[1]]
315     return backend.FinalizeExport(instance, snap_disks)
316
317   @staticmethod
318   def perspective_export_info(params):
319     """Query information about an existing export on this node.
320
321     The given path may not contain an export, in which case we return
322     None.
323
324     """
325     path = params[0]
326     return backend.ExportInfo(path)
327
328   @staticmethod
329   def perspective_export_list(params):
330     """List the available exports on this node.
331
332     Note that as opposed to export_info, which may query data about an
333     export in any path, this only queries the standard Ganeti path
334     (constants.EXPORT_DIR).
335
336     """
337     return backend.ListExports()
338
339   @staticmethod
340   def perspective_export_remove(params):
341     """Remove an export.
342
343     """
344     export = params[0]
345     return backend.RemoveExport(export)
346
347   # volume  --------------------------
348
349   @staticmethod
350   def perspective_lv_list(params):
351     """Query the list of logical volumes in a given volume group.
352
353     """
354     vgname = params[0]
355     return backend.GetVolumeList(vgname)
356
357   @staticmethod
358   def perspective_vg_list(params):
359     """Query the list of volume groups.
360
361     """
362     return backend.ListVolumeGroups()
363
364   # Storage --------------------------
365
366   @staticmethod
367   def perspective_storage_list(params):
368     """Get list of storage units.
369
370     """
371     (su_name, su_args, name, fields) = params
372     return storage.GetStorage(su_name, *su_args).List(name, fields)
373
374   @staticmethod
375   def perspective_storage_modify(params):
376     """Modify a storage unit.
377
378     """
379     (su_name, su_args, name, changes) = params
380     return storage.GetStorage(su_name, *su_args).Modify(name, changes)
381
382   # bridge  --------------------------
383
384   @staticmethod
385   def perspective_bridges_exist(params):
386     """Check if all bridges given exist on this node.
387
388     """
389     bridges_list = params[0]
390     return backend.BridgesExist(bridges_list)
391
392   # instance  --------------------------
393
394   @staticmethod
395   def perspective_instance_os_add(params):
396     """Install an OS on a given instance.
397
398     """
399     inst_s = params[0]
400     inst = objects.Instance.FromDict(inst_s)
401     reinstall = params[1]
402     return backend.InstanceOsAdd(inst, reinstall)
403
404   @staticmethod
405   def perspective_instance_run_rename(params):
406     """Runs the OS rename script for an instance.
407
408     """
409     inst_s, old_name = params
410     inst = objects.Instance.FromDict(inst_s)
411     return backend.RunRenameInstance(inst, old_name)
412
413   @staticmethod
414   def perspective_instance_os_import(params):
415     """Run the import function of an OS onto a given instance.
416
417     """
418     inst_s, src_node, src_images, cluster_name = params
419     inst = objects.Instance.FromDict(inst_s)
420     return backend.ImportOSIntoInstance(inst, src_node, src_images,
421                                         cluster_name)
422
423   @staticmethod
424   def perspective_instance_shutdown(params):
425     """Shutdown an instance.
426
427     """
428     instance = objects.Instance.FromDict(params[0])
429     return backend.InstanceShutdown(instance)
430
431   @staticmethod
432   def perspective_instance_start(params):
433     """Start an instance.
434
435     """
436     instance = objects.Instance.FromDict(params[0])
437     return backend.StartInstance(instance)
438
439   @staticmethod
440   def perspective_migration_info(params):
441     """Gather information about an instance to be migrated.
442
443     """
444     instance = objects.Instance.FromDict(params[0])
445     return backend.MigrationInfo(instance)
446
447   @staticmethod
448   def perspective_accept_instance(params):
449     """Prepare the node to accept an instance.
450
451     """
452     instance, info, target = params
453     instance = objects.Instance.FromDict(instance)
454     return backend.AcceptInstance(instance, info, target)
455
456   @staticmethod
457   def perspective_finalize_migration(params):
458     """Finalize the instance migration.
459
460     """
461     instance, info, success = params
462     instance = objects.Instance.FromDict(instance)
463     return backend.FinalizeMigration(instance, info, success)
464
465   @staticmethod
466   def perspective_instance_migrate(params):
467     """Migrates an instance.
468
469     """
470     instance, target, live = params
471     instance = objects.Instance.FromDict(instance)
472     return backend.MigrateInstance(instance, target, live)
473
474   @staticmethod
475   def perspective_instance_reboot(params):
476     """Reboot an instance.
477
478     """
479     instance = objects.Instance.FromDict(params[0])
480     reboot_type = params[1]
481     return backend.InstanceReboot(instance, reboot_type)
482
483   @staticmethod
484   def perspective_instance_info(params):
485     """Query instance information.
486
487     """
488     return backend.GetInstanceInfo(params[0], params[1])
489
490   @staticmethod
491   def perspective_instance_migratable(params):
492     """Query whether the specified instance can be migrated.
493
494     """
495     instance = objects.Instance.FromDict(params[0])
496     return backend.GetInstanceMigratable(instance)
497
498   @staticmethod
499   def perspective_all_instances_info(params):
500     """Query information about all instances.
501
502     """
503     return backend.GetAllInstancesInfo(params[0])
504
505   @staticmethod
506   def perspective_instance_list(params):
507     """Query the list of running instances.
508
509     """
510     return backend.GetInstanceList(params[0])
511
512   # node --------------------------
513
514   @staticmethod
515   def perspective_node_tcp_ping(params):
516     """Do a TcpPing on the remote node.
517
518     """
519     return utils.TcpPing(params[1], params[2], timeout=params[3],
520                          live_port_needed=params[4], source=params[0])
521
522   @staticmethod
523   def perspective_node_has_ip_address(params):
524     """Checks if a node has the given ip address.
525
526     """
527     return utils.OwnIpAddress(params[0])
528
529   @staticmethod
530   def perspective_node_info(params):
531     """Query node information.
532
533     """
534     vgname, hypervisor_type = params
535     return backend.GetNodeInfo(vgname, hypervisor_type)
536
537   @staticmethod
538   def perspective_node_add(params):
539     """Complete the registration of this node in the cluster.
540
541     """
542     return backend.AddNode(params[0], params[1], params[2],
543                            params[3], params[4], params[5])
544
545   @staticmethod
546   def perspective_node_verify(params):
547     """Run a verify sequence on this node.
548
549     """
550     return backend.VerifyNode(params[0], params[1])
551
552   @staticmethod
553   def perspective_node_start_master(params):
554     """Promote this node to master status.
555
556     """
557     return backend.StartMaster(params[0], params[1])
558
559   @staticmethod
560   def perspective_node_stop_master(params):
561     """Demote this node from master status.
562
563     """
564     return backend.StopMaster(params[0])
565
566   @staticmethod
567   def perspective_node_leave_cluster(params):
568     """Cleanup after leaving a cluster.
569
570     """
571     return backend.LeaveCluster()
572
573   @staticmethod
574   def perspective_node_volumes(params):
575     """Query the list of all logical volume groups.
576
577     """
578     return backend.NodeVolumes()
579
580   @staticmethod
581   def perspective_node_demote_from_mc(params):
582     """Demote a node from the master candidate role.
583
584     """
585     return backend.DemoteFromMC()
586
587
588   @staticmethod
589   def perspective_node_powercycle(params):
590     """Tries to powercycle the nod.
591
592     """
593     hypervisor_type = params[0]
594     return backend.PowercycleNode(hypervisor_type)
595
596
597   # cluster --------------------------
598
599   @staticmethod
600   def perspective_version(params):
601     """Query version information.
602
603     """
604     return constants.PROTOCOL_VERSION
605
606   @staticmethod
607   def perspective_upload_file(params):
608     """Upload a file.
609
610     Note that the backend implementation imposes strict rules on which
611     files are accepted.
612
613     """
614     return backend.UploadFile(*params)
615
616   @staticmethod
617   def perspective_master_info(params):
618     """Query master information.
619
620     """
621     return backend.GetMasterInfo()
622
623   @staticmethod
624   def perspective_write_ssconf_files(params):
625     """Write ssconf files.
626
627     """
628     (values,) = params
629     return backend.WriteSsconfFiles(values)
630
631   # os -----------------------
632
633   @staticmethod
634   def perspective_os_diagnose(params):
635     """Query detailed information about existing OSes.
636
637     """
638     return backend.DiagnoseOS()
639
640   @staticmethod
641   def perspective_os_get(params):
642     """Query information about a given OS.
643
644     """
645     name = params[0]
646     os_obj = backend.OSFromDisk(name)
647     return os_obj.ToDict()
648
649   # hooks -----------------------
650
651   @staticmethod
652   def perspective_hooks_runner(params):
653     """Run hook scripts.
654
655     """
656     hpath, phase, env = params
657     hr = backend.HooksRunner()
658     return hr.RunHooks(hpath, phase, env)
659
660   # iallocator -----------------
661
662   @staticmethod
663   def perspective_iallocator_runner(params):
664     """Run an iallocator script.
665
666     """
667     name, idata = params
668     iar = backend.IAllocatorRunner()
669     return iar.Run(name, idata)
670
671   # test -----------------------
672
673   @staticmethod
674   def perspective_test_delay(params):
675     """Run test delay.
676
677     """
678     duration = params[0]
679     status, rval = utils.TestDelay(duration)
680     if not status:
681       raise backend.RPCFail(rval)
682     return rval
683
684   # file storage ---------------
685
686   @staticmethod
687   def perspective_file_storage_dir_create(params):
688     """Create the file storage directory.
689
690     """
691     file_storage_dir = params[0]
692     return backend.CreateFileStorageDir(file_storage_dir)
693
694   @staticmethod
695   def perspective_file_storage_dir_remove(params):
696     """Remove the file storage directory.
697
698     """
699     file_storage_dir = params[0]
700     return backend.RemoveFileStorageDir(file_storage_dir)
701
702   @staticmethod
703   def perspective_file_storage_dir_rename(params):
704     """Rename the file storage directory.
705
706     """
707     old_file_storage_dir = params[0]
708     new_file_storage_dir = params[1]
709     return backend.RenameFileStorageDir(old_file_storage_dir,
710                                         new_file_storage_dir)
711
712   # jobs ------------------------
713
714   @staticmethod
715   @_RequireJobQueueLock
716   def perspective_jobqueue_update(params):
717     """Update job queue.
718
719     """
720     (file_name, content) = params
721     return backend.JobQueueUpdate(file_name, content)
722
723   @staticmethod
724   @_RequireJobQueueLock
725   def perspective_jobqueue_purge(params):
726     """Purge job queue.
727
728     """
729     return backend.JobQueuePurge()
730
731   @staticmethod
732   @_RequireJobQueueLock
733   def perspective_jobqueue_rename(params):
734     """Rename a job queue file.
735
736     """
737     # TODO: What if a file fails to rename?
738     return [backend.JobQueueRename(old, new) for old, new in params]
739
740   @staticmethod
741   def perspective_jobqueue_set_drain(params):
742     """Set/unset the queue drain flag.
743
744     """
745     drain_flag = params[0]
746     return backend.JobQueueSetDrainFlag(drain_flag)
747
748
749   # hypervisor ---------------
750
751   @staticmethod
752   def perspective_hypervisor_validate_params(params):
753     """Validate the hypervisor parameters.
754
755     """
756     (hvname, hvparams) = params
757     return backend.ValidateHVParams(hvname, hvparams)
758
759
760 def ExecNODED(options, args):
761   """Main NODED function, executed with the pidfile held.
762
763   """
764   global queue_lock
765
766   # Read SSL certificate
767   if options.ssl:
768     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
769                                     ssl_cert_path=options.ssl_cert)
770   else:
771     ssl_params = None
772
773   # Prepare job queue
774   queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
775
776   mainloop = daemon.Mainloop()
777   server = NodeHttpServer(mainloop, options.bind_address, options.port,
778                           ssl_params=ssl_params, ssl_verify_peer=True)
779   server.Start()
780   try:
781     mainloop.Run()
782   finally:
783     server.Stop()
784
785
786 def main():
787   """Main function for the node daemon.
788
789   """
790   parser = OptionParser(description="Ganeti node daemon",
791                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
792                         version="%%prog (ganeti) %s" %
793                         constants.RELEASE_VERSION)
794   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
795   dirs.append((constants.LOG_OS_DIR, 0750))
796   dirs.append((constants.LOCK_DIR, 1777))
797   daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED)
798
799
800 if __name__ == '__main__':
801   main()