Fix _AdjustCandidatePool
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import SocketServer
31 import errno
32 import logging
33 import signal
34
35 from optparse import OptionParser
36
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45
46 import ganeti.http.server
47
48
49 queue_lock = None
50
51
52 def _RequireJobQueueLock(fn):
53   """Decorator for job queue manipulating functions.
54
55   """
56   QUEUE_LOCK_TIMEOUT = 10
57
58   def wrapper(*args, **kwargs):
59     # Locking in exclusive, blocking mode because there could be several
60     # children running at the same time. Waiting up to 10 seconds.
61     queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62     try:
63       return fn(*args, **kwargs)
64     finally:
65       queue_lock.Unlock()
66
67   return wrapper
68
69
70 class NodeHttpServer(http.server.HttpServer):
71   """The server implementation.
72
73   This class holds all methods exposed over the RPC interface.
74
75   """
76   def __init__(self, *args, **kwargs):
77     http.server.HttpServer.__init__(self, *args, **kwargs)
78     self.noded_pid = os.getpid()
79
80   def HandleRequest(self, req):
81     """Handle a request.
82
83     """
84     if req.request_method.upper() != http.HTTP_PUT:
85       raise http.HttpBadRequest()
86
87     path = req.request_path
88     if path.startswith("/"):
89       path = path[1:]
90
91     method = getattr(self, "perspective_%s" % path, None)
92     if method is None:
93       raise http.HttpNotFound()
94
95     try:
96       try:
97         return method(req.request_body)
98       except:
99         logging.exception("Error in RPC call")
100         raise
101     except errors.QuitGanetiException, err:
102       # Tell parent to quit
103       os.kill(self.noded_pid, signal.SIGTERM)
104
105   # the new block devices  --------------------------
106
107   @staticmethod
108   def perspective_blockdev_create(params):
109     """Create a block device.
110
111     """
112     bdev_s, size, owner, on_primary, info = params
113     bdev = objects.Disk.FromDict(bdev_s)
114     if bdev is None:
115       raise ValueError("can't unserialize data!")
116     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
117
118   @staticmethod
119   def perspective_blockdev_remove(params):
120     """Remove a block device.
121
122     """
123     bdev_s = params[0]
124     bdev = objects.Disk.FromDict(bdev_s)
125     return backend.RemoveBlockDevice(bdev)
126
127   @staticmethod
128   def perspective_blockdev_rename(params):
129     """Remove a block device.
130
131     """
132     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133     return backend.RenameBlockDevices(devlist)
134
135   @staticmethod
136   def perspective_blockdev_assemble(params):
137     """Assemble a block device.
138
139     """
140     bdev_s, owner, on_primary = params
141     bdev = objects.Disk.FromDict(bdev_s)
142     if bdev is None:
143       raise ValueError("can't unserialize data!")
144     return backend.AssembleBlockDevice(bdev, owner, on_primary)
145
146   @staticmethod
147   def perspective_blockdev_shutdown(params):
148     """Shutdown a block device.
149
150     """
151     bdev_s = params[0]
152     bdev = objects.Disk.FromDict(bdev_s)
153     if bdev is None:
154       raise ValueError("can't unserialize data!")
155     return backend.ShutdownBlockDevice(bdev)
156
157   @staticmethod
158   def perspective_blockdev_addchildren(params):
159     """Add a child to a mirror device.
160
161     Note: this is only valid for mirror devices. It's the caller's duty
162     to send a correct disk, otherwise we raise an error.
163
164     """
165     bdev_s, ndev_s = params
166     bdev = objects.Disk.FromDict(bdev_s)
167     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168     if bdev is None or ndevs.count(None) > 0:
169       raise ValueError("can't unserialize data!")
170     return backend.MirrorAddChildren(bdev, ndevs)
171
172   @staticmethod
173   def perspective_blockdev_removechildren(params):
174     """Remove a child from a mirror device.
175
176     This is only valid for mirror devices, of course. It's the callers
177     duty to send a correct disk, otherwise we raise an error.
178
179     """
180     bdev_s, ndev_s = params
181     bdev = objects.Disk.FromDict(bdev_s)
182     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183     if bdev is None or ndevs.count(None) > 0:
184       raise ValueError("can't unserialize data!")
185     return backend.MirrorRemoveChildren(bdev, ndevs)
186
187   @staticmethod
188   def perspective_blockdev_getmirrorstatus(params):
189     """Return the mirror status for a list of disks.
190
191     """
192     disks = [objects.Disk.FromDict(dsk_s)
193             for dsk_s in params]
194     return backend.GetMirrorStatus(disks)
195
196   @staticmethod
197   def perspective_blockdev_find(params):
198     """Expose the FindBlockDevice functionality for a disk.
199
200     This will try to find but not activate a disk.
201
202     """
203     disk = objects.Disk.FromDict(params[0])
204     return backend.FindBlockDevice(disk)
205
206   @staticmethod
207   def perspective_blockdev_snapshot(params):
208     """Create a snapshot device.
209
210     Note that this is only valid for LVM disks, if we get passed
211     something else we raise an exception. The snapshot device can be
212     remove by calling the generic block device remove call.
213
214     """
215     cfbd = objects.Disk.FromDict(params[0])
216     return backend.SnapshotBlockDevice(cfbd)
217
218   @staticmethod
219   def perspective_blockdev_grow(params):
220     """Grow a stack of devices.
221
222     """
223     cfbd = objects.Disk.FromDict(params[0])
224     amount = params[1]
225     return backend.GrowBlockDevice(cfbd, amount)
226
227   @staticmethod
228   def perspective_blockdev_close(params):
229     """Closes the given block devices.
230
231     """
232     disks = [objects.Disk.FromDict(cf) for cf in params]
233     return backend.CloseBlockDevices(disks)
234
235   # export/import  --------------------------
236
237   @staticmethod
238   def perspective_snapshot_export(params):
239     """Export a given snapshot.
240
241     """
242     disk = objects.Disk.FromDict(params[0])
243     dest_node = params[1]
244     instance = objects.Instance.FromDict(params[2])
245     cluster_name = params[3]
246     dev_idx = params[4]
247     return backend.ExportSnapshot(disk, dest_node, instance,
248                                   cluster_name, dev_idx)
249
250   @staticmethod
251   def perspective_finalize_export(params):
252     """Expose the finalize export functionality.
253
254     """
255     instance = objects.Instance.FromDict(params[0])
256     snap_disks = [objects.Disk.FromDict(str_data)
257                   for str_data in params[1]]
258     return backend.FinalizeExport(instance, snap_disks)
259
260   @staticmethod
261   def perspective_export_info(params):
262     """Query information about an existing export on this node.
263
264     The given path may not contain an export, in which case we return
265     None.
266
267     """
268     path = params[0]
269     einfo = backend.ExportInfo(path)
270     if einfo is None:
271       return einfo
272     return einfo.Dumps()
273
274   @staticmethod
275   def perspective_export_list(params):
276     """List the available exports on this node.
277
278     Note that as opposed to export_info, which may query data about an
279     export in any path, this only queries the standard Ganeti path
280     (constants.EXPORT_DIR).
281
282     """
283     return backend.ListExports()
284
285   @staticmethod
286   def perspective_export_remove(params):
287     """Remove an export.
288
289     """
290     export = params[0]
291     return backend.RemoveExport(export)
292
293   # volume  --------------------------
294
295   @staticmethod
296   def perspective_volume_list(params):
297     """Query the list of logical volumes in a given volume group.
298
299     """
300     vgname = params[0]
301     return backend.GetVolumeList(vgname)
302
303   @staticmethod
304   def perspective_vg_list(params):
305     """Query the list of volume groups.
306
307     """
308     return backend.ListVolumeGroups()
309
310   # bridge  --------------------------
311
312   @staticmethod
313   def perspective_bridges_exist(params):
314     """Check if all bridges given exist on this node.
315
316     """
317     bridges_list = params[0]
318     return backend.BridgesExist(bridges_list)
319
320   # instance  --------------------------
321
322   @staticmethod
323   def perspective_instance_os_add(params):
324     """Install an OS on a given instance.
325
326     """
327     inst_s = params[0]
328     inst = objects.Instance.FromDict(inst_s)
329     return backend.AddOSToInstance(inst)
330
331   @staticmethod
332   def perspective_instance_run_rename(params):
333     """Runs the OS rename script for an instance.
334
335     """
336     inst_s, old_name = params
337     inst = objects.Instance.FromDict(inst_s)
338     return backend.RunRenameInstance(inst, old_name)
339
340   @staticmethod
341   def perspective_instance_os_import(params):
342     """Run the import function of an OS onto a given instance.
343
344     """
345     inst_s, src_node, src_images, cluster_name = params
346     inst = objects.Instance.FromDict(inst_s)
347     return backend.ImportOSIntoInstance(inst, src_node, src_images,
348                                         cluster_name)
349
350   @staticmethod
351   def perspective_instance_shutdown(params):
352     """Shutdown an instance.
353
354     """
355     instance = objects.Instance.FromDict(params[0])
356     return backend.ShutdownInstance(instance)
357
358   @staticmethod
359   def perspective_instance_start(params):
360     """Start an instance.
361
362     """
363     instance = objects.Instance.FromDict(params[0])
364     extra_args = params[1]
365     return backend.StartInstance(instance, extra_args)
366
367   @staticmethod
368   def perspective_instance_migrate(params):
369     """Migrates an instance.
370
371     """
372     instance, target, live = params
373     instance = objects.Instance.FromDict(instance)
374     return backend.MigrateInstance(instance, target, live)
375
376   @staticmethod
377   def perspective_instance_reboot(params):
378     """Reboot an instance.
379
380     """
381     instance = objects.Instance.FromDict(params[0])
382     reboot_type = params[1]
383     extra_args = params[2]
384     return backend.RebootInstance(instance, reboot_type, extra_args)
385
386   @staticmethod
387   def perspective_instance_info(params):
388     """Query instance information.
389
390     """
391     return backend.GetInstanceInfo(params[0], params[1])
392
393   @staticmethod
394   def perspective_all_instances_info(params):
395     """Query information about all instances.
396
397     """
398     return backend.GetAllInstancesInfo(params[0])
399
400   @staticmethod
401   def perspective_instance_list(params):
402     """Query the list of running instances.
403
404     """
405     return backend.GetInstanceList(params[0])
406
407   # node --------------------------
408
409   @staticmethod
410   def perspective_node_tcp_ping(params):
411     """Do a TcpPing on the remote node.
412
413     """
414     return utils.TcpPing(params[1], params[2], timeout=params[3],
415                          live_port_needed=params[4], source=params[0])
416
417   @staticmethod
418   def perspective_node_has_ip_address(params):
419     """Checks if a node has the given ip address.
420
421     """
422     return utils.OwnIpAddress(params[0])
423
424   @staticmethod
425   def perspective_node_info(params):
426     """Query node information.
427
428     """
429     vgname, hypervisor_type = params
430     return backend.GetNodeInfo(vgname, hypervisor_type)
431
432   @staticmethod
433   def perspective_node_add(params):
434     """Complete the registration of this node in the cluster.
435
436     """
437     return backend.AddNode(params[0], params[1], params[2],
438                            params[3], params[4], params[5])
439
440   @staticmethod
441   def perspective_node_verify(params):
442     """Run a verify sequence on this node.
443
444     """
445     return backend.VerifyNode(params[0], params[1])
446
447   @staticmethod
448   def perspective_node_start_master(params):
449     """Promote this node to master status.
450
451     """
452     return backend.StartMaster(params[0])
453
454   @staticmethod
455   def perspective_node_stop_master(params):
456     """Demote this node from master status.
457
458     """
459     return backend.StopMaster(params[0])
460
461   @staticmethod
462   def perspective_node_leave_cluster(params):
463     """Cleanup after leaving a cluster.
464
465     """
466     return backend.LeaveCluster()
467
468   @staticmethod
469   def perspective_node_volumes(params):
470     """Query the list of all logical volume groups.
471
472     """
473     return backend.NodeVolumes()
474
475   @staticmethod
476   def perspective_node_demote_from_mc(params):
477     """Demote a node from the master candidate role.
478
479     """
480     return backend.DemoteFromMC()
481
482
483   # cluster --------------------------
484
485   @staticmethod
486   def perspective_version(params):
487     """Query version information.
488
489     """
490     return constants.PROTOCOL_VERSION
491
492   @staticmethod
493   def perspective_upload_file(params):
494     """Upload a file.
495
496     Note that the backend implementation imposes strict rules on which
497     files are accepted.
498
499     """
500     return backend.UploadFile(*params)
501
502   @staticmethod
503   def perspective_master_info(params):
504     """Query master information.
505
506     """
507     return backend.GetMasterInfo()
508
509   @staticmethod
510   def perspective_write_ssconf_files(params):
511     """Write ssconf files.
512
513     """
514     (values,) = params
515     return backend.WriteSsconfFiles(values)
516
517   # os -----------------------
518
519   @staticmethod
520   def perspective_os_diagnose(params):
521     """Query detailed information about existing OSes.
522
523     """
524     return [os.ToDict() for os in backend.DiagnoseOS()]
525
526   @staticmethod
527   def perspective_os_get(params):
528     """Query information about a given OS.
529
530     """
531     name = params[0]
532     try:
533       os_obj = backend.OSFromDisk(name)
534     except errors.InvalidOS, err:
535       os_obj = objects.OS.FromInvalidOS(err)
536     return os_obj.ToDict()
537
538   # hooks -----------------------
539
540   @staticmethod
541   def perspective_hooks_runner(params):
542     """Run hook scripts.
543
544     """
545     hpath, phase, env = params
546     hr = backend.HooksRunner()
547     return hr.RunHooks(hpath, phase, env)
548
549   # iallocator -----------------
550
551   @staticmethod
552   def perspective_iallocator_runner(params):
553     """Run an iallocator script.
554
555     """
556     name, idata = params
557     iar = backend.IAllocatorRunner()
558     return iar.Run(name, idata)
559
560   # test -----------------------
561
562   @staticmethod
563   def perspective_test_delay(params):
564     """Run test delay.
565
566     """
567     duration = params[0]
568     return utils.TestDelay(duration)
569
570   # file storage ---------------
571
572   @staticmethod
573   def perspective_file_storage_dir_create(params):
574     """Create the file storage directory.
575
576     """
577     file_storage_dir = params[0]
578     return backend.CreateFileStorageDir(file_storage_dir)
579
580   @staticmethod
581   def perspective_file_storage_dir_remove(params):
582     """Remove the file storage directory.
583
584     """
585     file_storage_dir = params[0]
586     return backend.RemoveFileStorageDir(file_storage_dir)
587
588   @staticmethod
589   def perspective_file_storage_dir_rename(params):
590     """Rename the file storage directory.
591
592     """
593     old_file_storage_dir = params[0]
594     new_file_storage_dir = params[1]
595     return backend.RenameFileStorageDir(old_file_storage_dir,
596                                         new_file_storage_dir)
597
598   # jobs ------------------------
599
600   @staticmethod
601   @_RequireJobQueueLock
602   def perspective_jobqueue_update(params):
603     """Update job queue.
604
605     """
606     (file_name, content) = params
607     return backend.JobQueueUpdate(file_name, content)
608
609   @staticmethod
610   @_RequireJobQueueLock
611   def perspective_jobqueue_purge(params):
612     """Purge job queue.
613
614     """
615     return backend.JobQueuePurge()
616
617   @staticmethod
618   @_RequireJobQueueLock
619   def perspective_jobqueue_rename(params):
620     """Rename a job queue file.
621
622     """
623     (old, new) = params
624
625     return backend.JobQueueRename(old, new)
626
627   @staticmethod
628   def perspective_jobqueue_set_drain(params):
629     """Set/unset the queue drain flag.
630
631     """
632     drain_flag = params[0]
633     return backend.JobQueueSetDrainFlag(drain_flag)
634
635
636   # hypervisor ---------------
637
638   @staticmethod
639   def perspective_hypervisor_validate_params(params):
640     """Validate the hypervisor parameters.
641
642     """
643     (hvname, hvparams) = params
644     return backend.ValidateHVParams(hvname, hvparams)
645
646
647 def ParseOptions():
648   """Parse the command line options.
649
650   Returns:
651     (options, args) as from OptionParser.parse_args()
652
653   """
654   parser = OptionParser(description="Ganeti node daemon",
655                         usage="%prog [-f] [-d]",
656                         version="%%prog (ganeti) %s" %
657                         constants.RELEASE_VERSION)
658
659   parser.add_option("-f", "--foreground", dest="fork",
660                     help="Don't detach from the current terminal",
661                     default=True, action="store_false")
662   parser.add_option("-d", "--debug", dest="debug",
663                     help="Enable some debug messages",
664                     default=False, action="store_true")
665   options, args = parser.parse_args()
666   return options, args
667
668
669 def EnsureRuntimeEnvironment():
670   """Ensure our run-time environment is complete.
671
672   Currently this creates directories which could be missing, either
673   due to directories being on a tmpfs mount, or due to incomplete
674   packaging.
675
676   """
677   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
678   dirs.append((constants.LOG_OS_DIR, 0750))
679   for dir_name, dir_mode in dirs:
680     if not os.path.exists(dir_name):
681       try:
682         os.mkdir(dir_name, dir_mode)
683       except EnvironmentError, err:
684         if err.errno != errno.EEXIST:
685           print ("Node setup wrong, cannot create directory '%s': %s" %
686                  (dir_name, err))
687           sys.exit(5)
688     if not os.path.isdir(dir_name):
689       print ("Node setup wrong, '%s' is not a directory" % dir_name)
690       sys.exit(5)
691
692
693 def main():
694   """Main function for the node daemon.
695
696   """
697   global queue_lock
698
699   options, args = ParseOptions()
700   utils.debug = options.debug
701   for fname in (constants.SSL_CERT_FILE,):
702     if not os.path.isfile(fname):
703       print "config %s not there, will not run." % fname
704       sys.exit(5)
705
706   try:
707     port = utils.GetNodeDaemonPort()
708   except errors.ConfigurationError, err:
709     print "Cluster configuration incomplete: '%s'" % str(err)
710     sys.exit(5)
711
712   EnsureRuntimeEnvironment()
713
714   # become a daemon
715   if options.fork:
716     utils.Daemonize(logfile=constants.LOG_NODESERVER)
717
718   utils.WritePidFile(constants.NODED_PID)
719   try:
720     utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
721                        stderr_logging=not options.fork)
722     logging.info("ganeti node daemon startup")
723
724     # Read SSL certificate
725     ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
726                                     ssl_cert_path=constants.SSL_CERT_FILE)
727
728     # Prepare job queue
729     queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
730
731     mainloop = daemon.Mainloop()
732     server = NodeHttpServer(mainloop, "", port,
733                             ssl_params=ssl_params, ssl_verify_peer=True)
734     server.Start()
735     try:
736       mainloop.Run()
737     finally:
738       server.Stop()
739   finally:
740     utils.RemovePidFile(constants.NODED_PID)
741
742
743 if __name__ == '__main__':
744   main()