Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ e69d05fd

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import http
44
from ganeti import utils
45

    
46

    
47
queue_lock = None
48

    
49

    
50
def _RequireJobQueueLock(fn):
51
  """Decorator for job queue manipulating functions.
52

    
53
  """
54
  QUEUE_LOCK_TIMEOUT = 10
55

    
56
  def wrapper(*args, **kwargs):
57
    # Locking in exclusive, blocking mode because there could be several
58
    # children running at the same time. Waiting up to 10 seconds.
59
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60
    try:
61
      return fn(*args, **kwargs)
62
    finally:
63
      queue_lock.Unlock()
64

    
65
  return wrapper
66

    
67

    
68
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
69
  """The server implementation.
70

    
71
  This class holds all methods exposed over the RPC interface.
72

    
73
  """
74
  def HandleRequest(self):
75
    """Handle a request.
76

    
77
    """
78
    if self.command.upper() != "PUT":
79
      raise http.HTTPBadRequest()
80

    
81
    path = self.path
82
    if path.startswith("/"):
83
      path = path[1:]
84

    
85
    method = getattr(self, "perspective_%s" % path, None)
86
    if method is None:
87
      raise httperror.HTTPNotFound()
88

    
89
    try:
90
      try:
91
        return method(self.post_data)
92
      except:
93
        logging.exception("Error in RPC call")
94
        raise
95
    except errors.QuitGanetiException, err:
96
      # Tell parent to quit
97
      os.kill(self.server.noded_pid, signal.SIGTERM)
98

    
99
  # the new block devices  --------------------------
100

    
101
  @staticmethod
102
  def perspective_blockdev_create(params):
103
    """Create a block device.
104

    
105
    """
106
    bdev_s, size, owner, on_primary, info = params
107
    bdev = objects.Disk.FromDict(bdev_s)
108
    if bdev is None:
109
      raise ValueError("can't unserialize data!")
110
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
111

    
112
  @staticmethod
113
  def perspective_blockdev_remove(params):
114
    """Remove a block device.
115

    
116
    """
117
    bdev_s = params[0]
118
    bdev = objects.Disk.FromDict(bdev_s)
119
    return backend.RemoveBlockDevice(bdev)
120

    
121
  @staticmethod
122
  def perspective_blockdev_rename(params):
123
    """Remove a block device.
124

    
125
    """
126
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
127
    return backend.RenameBlockDevices(devlist)
128

    
129
  @staticmethod
130
  def perspective_blockdev_assemble(params):
131
    """Assemble a block device.
132

    
133
    """
134
    bdev_s, owner, on_primary = params
135
    bdev = objects.Disk.FromDict(bdev_s)
136
    if bdev is None:
137
      raise ValueError("can't unserialize data!")
138
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
139

    
140
  @staticmethod
141
  def perspective_blockdev_shutdown(params):
142
    """Shutdown a block device.
143

    
144
    """
145
    bdev_s = params[0]
146
    bdev = objects.Disk.FromDict(bdev_s)
147
    if bdev is None:
148
      raise ValueError("can't unserialize data!")
149
    return backend.ShutdownBlockDevice(bdev)
150

    
151
  @staticmethod
152
  def perspective_blockdev_addchildren(params):
153
    """Add a child to a mirror device.
154

    
155
    Note: this is only valid for mirror devices. It's the caller's duty
156
    to send a correct disk, otherwise we raise an error.
157

    
158
    """
159
    bdev_s, ndev_s = params
160
    bdev = objects.Disk.FromDict(bdev_s)
161
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
162
    if bdev is None or ndevs.count(None) > 0:
163
      raise ValueError("can't unserialize data!")
164
    return backend.MirrorAddChildren(bdev, ndevs)
165

    
166
  @staticmethod
167
  def perspective_blockdev_removechildren(params):
168
    """Remove a child from a mirror device.
169

    
170
    This is only valid for mirror devices, of course. It's the callers
171
    duty to send a correct disk, otherwise we raise an error.
172

    
173
    """
174
    bdev_s, ndev_s = params
175
    bdev = objects.Disk.FromDict(bdev_s)
176
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177
    if bdev is None or ndevs.count(None) > 0:
178
      raise ValueError("can't unserialize data!")
179
    return backend.MirrorRemoveChildren(bdev, ndevs)
180

    
181
  @staticmethod
182
  def perspective_blockdev_getmirrorstatus(params):
183
    """Return the mirror status for a list of disks.
184

    
185
    """
186
    disks = [objects.Disk.FromDict(dsk_s)
187
            for dsk_s in params]
188
    return backend.GetMirrorStatus(disks)
189

    
190
  @staticmethod
191
  def perspective_blockdev_find(params):
192
    """Expose the FindBlockDevice functionality for a disk.
193

    
194
    This will try to find but not activate a disk.
195

    
196
    """
197
    disk = objects.Disk.FromDict(params[0])
198
    return backend.FindBlockDevice(disk)
199

    
200
  @staticmethod
201
  def perspective_blockdev_snapshot(params):
202
    """Create a snapshot device.
203

    
204
    Note that this is only valid for LVM disks, if we get passed
205
    something else we raise an exception. The snapshot device can be
206
    remove by calling the generic block device remove call.
207

    
208
    """
209
    cfbd = objects.Disk.FromDict(params[0])
210
    return backend.SnapshotBlockDevice(cfbd)
211

    
212
  @staticmethod
213
  def perspective_blockdev_grow(params):
214
    """Grow a stack of devices.
215

    
216
    """
217
    cfbd = objects.Disk.FromDict(params[0])
218
    amount = params[1]
219
    return backend.GrowBlockDevice(cfbd, amount)
220

    
221
  @staticmethod
222
  def perspective_blockdev_close(params):
223
    """Closes the given block devices.
224

    
225
    """
226
    disks = [objects.Disk.FromDict(cf) for cf in params]
227
    return backend.CloseBlockDevices(disks)
228

    
229
  # export/import  --------------------------
230

    
231
  @staticmethod
232
  def perspective_snapshot_export(params):
233
    """Export a given snapshot.
234

    
235
    """
236
    disk = objects.Disk.FromDict(params[0])
237
    dest_node = params[1]
238
    instance = objects.Instance.FromDict(params[2])
239
    cluster_name = params[3]
240
    return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
241

    
242
  @staticmethod
243
  def perspective_finalize_export(params):
244
    """Expose the finalize export functionality.
245

    
246
    """
247
    instance = objects.Instance.FromDict(params[0])
248
    snap_disks = [objects.Disk.FromDict(str_data)
249
                  for str_data in params[1]]
250
    return backend.FinalizeExport(instance, snap_disks)
251

    
252
  @staticmethod
253
  def perspective_export_info(params):
254
    """Query information about an existing export on this node.
255

    
256
    The given path may not contain an export, in which case we return
257
    None.
258

    
259
    """
260
    path = params[0]
261
    einfo = backend.ExportInfo(path)
262
    if einfo is None:
263
      return einfo
264
    return einfo.Dumps()
265

    
266
  @staticmethod
267
  def perspective_export_list(params):
268
    """List the available exports on this node.
269

    
270
    Note that as opposed to export_info, which may query data about an
271
    export in any path, this only queries the standard Ganeti path
272
    (constants.EXPORT_DIR).
273

    
274
    """
275
    return backend.ListExports()
276

    
277
  @staticmethod
278
  def perspective_export_remove(params):
279
    """Remove an export.
280

    
281
    """
282
    export = params[0]
283
    return backend.RemoveExport(export)
284

    
285
  # volume  --------------------------
286

    
287
  @staticmethod
288
  def perspective_volume_list(params):
289
    """Query the list of logical volumes in a given volume group.
290

    
291
    """
292
    vgname = params[0]
293
    return backend.GetVolumeList(vgname)
294

    
295
  @staticmethod
296
  def perspective_vg_list(params):
297
    """Query the list of volume groups.
298

    
299
    """
300
    return backend.ListVolumeGroups()
301

    
302
  # bridge  --------------------------
303

    
304
  @staticmethod
305
  def perspective_bridges_exist(params):
306
    """Check if all bridges given exist on this node.
307

    
308
    """
309
    bridges_list = params[0]
310
    return backend.BridgesExist(bridges_list)
311

    
312
  # instance  --------------------------
313

    
314
  @staticmethod
315
  def perspective_instance_os_add(params):
316
    """Install an OS on a given instance.
317

    
318
    """
319
    inst_s, os_disk, swap_disk = params
320
    inst = objects.Instance.FromDict(inst_s)
321
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
322

    
323
  @staticmethod
324
  def perspective_instance_run_rename(params):
325
    """Runs the OS rename script for an instance.
326

    
327
    """
328
    inst_s, old_name, os_disk, swap_disk = params
329
    inst = objects.Instance.FromDict(inst_s)
330
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
331

    
332
  @staticmethod
333
  def perspective_instance_os_import(params):
334
    """Run the import function of an OS onto a given instance.
335

    
336
    """
337
    inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
338
    inst = objects.Instance.FromDict(inst_s)
339
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
340
                                        src_node, src_image, cluster_name)
341

    
342
  @staticmethod
343
  def perspective_instance_shutdown(params):
344
    """Shutdown an instance.
345

    
346
    """
347
    instance = objects.Instance.FromDict(params[0])
348
    return backend.ShutdownInstance(instance)
349

    
350
  @staticmethod
351
  def perspective_instance_start(params):
352
    """Start an instance.
353

    
354
    """
355
    instance = objects.Instance.FromDict(params[0])
356
    extra_args = params[1]
357
    return backend.StartInstance(instance, extra_args)
358

    
359
  @staticmethod
360
  def perspective_instance_migrate(params):
361
    """Migrates an instance.
362

    
363
    """
364
    instance, target, live = params
365
    instance = objects.Instance.FromDict(instance)
366
    return backend.MigrateInstance(instance, target, live)
367

    
368
  @staticmethod
369
  def perspective_instance_reboot(params):
370
    """Reboot an instance.
371

    
372
    """
373
    instance = objects.Instance.FromDict(params[0])
374
    reboot_type = params[1]
375
    extra_args = params[2]
376
    return backend.RebootInstance(instance, reboot_type, extra_args)
377

    
378
  @staticmethod
379
  def perspective_instance_info(params):
380
    """Query instance information.
381

    
382
    """
383
    return backend.GetInstanceInfo(params[0])
384

    
385
  @staticmethod
386
  def perspective_all_instances_info(params):
387
    """Query information about all instances.
388

    
389
    """
390
    return backend.GetAllInstancesInfo(params[0])
391

    
392
  @staticmethod
393
  def perspective_instance_list(params):
394
    """Query the list of running instances.
395

    
396
    """
397
    return backend.GetInstanceList(params[0])
398

    
399
  # node --------------------------
400

    
401
  @staticmethod
402
  def perspective_node_tcp_ping(params):
403
    """Do a TcpPing on the remote node.
404

    
405
    """
406
    return utils.TcpPing(params[1], params[2], timeout=params[3],
407
                         live_port_needed=params[4], source=params[0])
408

    
409
  @staticmethod
410
  def perspective_node_info(params):
411
    """Query node information.
412

    
413
    """
414
    vgname, hypervisor_type = params
415
    return backend.GetNodeInfo(vgname, hypervisor_type)
416

    
417
  @staticmethod
418
  def perspective_node_add(params):
419
    """Complete the registration of this node in the cluster.
420

    
421
    """
422
    return backend.AddNode(params[0], params[1], params[2],
423
                           params[3], params[4], params[5])
424

    
425
  @staticmethod
426
  def perspective_node_verify(params):
427
    """Run a verify sequence on this node.
428

    
429
    """
430
    return backend.VerifyNode(params[0], params[1])
431

    
432
  @staticmethod
433
  def perspective_node_start_master(params):
434
    """Promote this node to master status.
435

    
436
    """
437
    return backend.StartMaster(params[0])
438

    
439
  @staticmethod
440
  def perspective_node_stop_master(params):
441
    """Demote this node from master status.
442

    
443
    """
444
    return backend.StopMaster(params[0])
445

    
446
  @staticmethod
447
  def perspective_node_leave_cluster(params):
448
    """Cleanup after leaving a cluster.
449

    
450
    """
451
    return backend.LeaveCluster()
452

    
453
  @staticmethod
454
  def perspective_node_volumes(params):
455
    """Query the list of all logical volume groups.
456

    
457
    """
458
    return backend.NodeVolumes()
459

    
460
  # cluster --------------------------
461

    
462
  @staticmethod
463
  def perspective_version(params):
464
    """Query version information.
465

    
466
    """
467
    return constants.PROTOCOL_VERSION
468

    
469
  @staticmethod
470
  def perspective_upload_file(params):
471
    """Upload a file.
472

    
473
    Note that the backend implementation imposes strict rules on which
474
    files are accepted.
475

    
476
    """
477
    return backend.UploadFile(*params)
478

    
479
  @staticmethod
480
  def perspective_master_info(params):
481
    """Query master information.
482

    
483
    """
484
    return backend.GetMasterInfo()
485

    
486
  # os -----------------------
487

    
488
  @staticmethod
489
  def perspective_os_diagnose(params):
490
    """Query detailed information about existing OSes.
491

    
492
    """
493
    return [os.ToDict() for os in backend.DiagnoseOS()]
494

    
495
  @staticmethod
496
  def perspective_os_get(params):
497
    """Query information about a given OS.
498

    
499
    """
500
    name = params[0]
501
    try:
502
      os_obj = backend.OSFromDisk(name)
503
    except errors.InvalidOS, err:
504
      os_obj = objects.OS.FromInvalidOS(err)
505
    return os_obj.ToDict()
506

    
507
  # hooks -----------------------
508

    
509
  @staticmethod
510
  def perspective_hooks_runner(params):
511
    """Run hook scripts.
512

    
513
    """
514
    hpath, phase, env = params
515
    hr = backend.HooksRunner()
516
    return hr.RunHooks(hpath, phase, env)
517

    
518
  # iallocator -----------------
519

    
520
  @staticmethod
521
  def perspective_iallocator_runner(params):
522
    """Run an iallocator script.
523

    
524
    """
525
    name, idata = params
526
    iar = backend.IAllocatorRunner()
527
    return iar.Run(name, idata)
528

    
529
  # test -----------------------
530

    
531
  @staticmethod
532
  def perspective_test_delay(params):
533
    """Run test delay.
534

    
535
    """
536
    duration = params[0]
537
    return utils.TestDelay(duration)
538

    
539
  # file storage ---------------
540

    
541
  @staticmethod
542
  def perspective_file_storage_dir_create(params):
543
    """Create the file storage directory.
544

    
545
    """
546
    file_storage_dir = params[0]
547
    return backend.CreateFileStorageDir(file_storage_dir)
548

    
549
  @staticmethod
550
  def perspective_file_storage_dir_remove(params):
551
    """Remove the file storage directory.
552

    
553
    """
554
    file_storage_dir = params[0]
555
    return backend.RemoveFileStorageDir(file_storage_dir)
556

    
557
  @staticmethod
558
  def perspective_file_storage_dir_rename(params):
559
    """Rename the file storage directory.
560

    
561
    """
562
    old_file_storage_dir = params[0]
563
    new_file_storage_dir = params[1]
564
    return backend.RenameFileStorageDir(old_file_storage_dir,
565
                                        new_file_storage_dir)
566

    
567
  # jobs ------------------------
568

    
569
  @staticmethod
570
  @_RequireJobQueueLock
571
  def perspective_jobqueue_update(params):
572
    """Update job queue.
573

    
574
    """
575
    (file_name, content) = params
576
    return backend.JobQueueUpdate(file_name, content)
577

    
578
  @staticmethod
579
  @_RequireJobQueueLock
580
  def perspective_jobqueue_purge(params):
581
    """Purge job queue.
582

    
583
    """
584
    return backend.JobQueuePurge()
585

    
586
  @staticmethod
587
  @_RequireJobQueueLock
588
  def perspective_jobqueue_rename(params):
589
    """Rename a job queue file.
590

    
591
    """
592
    (old, new) = params
593

    
594
    return backend.JobQueueRename(old, new)
595

    
596

    
597
class NodeDaemonHttpServer(http.HTTPServer):
598
  def __init__(self, server_address):
599
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
600
    self.noded_pid = os.getpid()
601

    
602
  def serve_forever(self):
603
    """Handle requests until told to quit."""
604
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
605
    try:
606
      while not sighandler.called:
607
        self.handle_request()
608
      # TODO: There could be children running at this point
609
    finally:
610
      sighandler.Reset()
611

    
612

    
613
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
614
  """Forking HTTP Server.
615

    
616
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
617
  request we handle. This allows more requests to be handled concurrently.
618

    
619
  """
620

    
621

    
622
def ParseOptions():
623
  """Parse the command line options.
624

    
625
  Returns:
626
    (options, args) as from OptionParser.parse_args()
627

    
628
  """
629
  parser = OptionParser(description="Ganeti node daemon",
630
                        usage="%prog [-f] [-d]",
631
                        version="%%prog (ganeti) %s" %
632
                        constants.RELEASE_VERSION)
633

    
634
  parser.add_option("-f", "--foreground", dest="fork",
635
                    help="Don't detach from the current terminal",
636
                    default=True, action="store_false")
637
  parser.add_option("-d", "--debug", dest="debug",
638
                    help="Enable some debug messages",
639
                    default=False, action="store_true")
640
  options, args = parser.parse_args()
641
  return options, args
642

    
643

    
644
def main():
645
  """Main function for the node daemon.
646

    
647
  """
648
  global queue_lock
649

    
650
  options, args = ParseOptions()
651
  utils.debug = options.debug
652
  for fname in (constants.SSL_CERT_FILE,):
653
    if not os.path.isfile(fname):
654
      print "config %s not there, will not run." % fname
655
      sys.exit(5)
656

    
657
  try:
658
    port = utils.GetNodeDaemonPort()
659
    pwdata = utils.GetNodeDaemonPassword()
660
  except errors.ConfigurationError, err:
661
    print "Cluster configuration incomplete: '%s'" % str(err)
662
    sys.exit(5)
663

    
664
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
665
  # situation where RUN_DIR is tmpfs
666
  for dir_name in constants.SUB_RUN_DIRS:
667
    if not os.path.exists(dir_name):
668
      try:
669
        os.mkdir(dir_name, 0755)
670
      except EnvironmentError, err:
671
        if err.errno != errno.EEXIST:
672
          print ("Node setup wrong, cannot create directory %s: %s" %
673
                 (dir_name, err))
674
          sys.exit(5)
675
    if not os.path.isdir(dir_name):
676
      print ("Node setup wrong, %s is not a directory" % dir_name)
677
      sys.exit(5)
678

    
679
  # become a daemon
680
  if options.fork:
681
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
682

    
683
  utils.WritePidFile(constants.NODED_PID)
684

    
685
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
686
                      stderr_logging=not options.fork)
687
  logging.info("ganeti node daemon startup")
688

    
689
  # Prepare job queue
690
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
691

    
692
  if options.fork:
693
    server = ForkingHTTPServer(('', port))
694
  else:
695
    server = NodeDaemonHttpServer(('', port))
696

    
697
  try:
698
    server.serve_forever()
699
  finally:
700
    utils.RemovePidFile(constants.NODED_PID)
701

    
702

    
703
if __name__ == '__main__':
704
  main()