Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 506cff12

History | View | Annotate | Download (17.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import ssconf
44
from ganeti import http
45
from ganeti import utils
46

    
47

    
48
queue_lock = None
49

    
50

    
51
def _RequireJobQueueLock(fn):
52
  """Decorator for job queue manipulating functions.
53

    
54
  """
55
  def wrapper(*args, **kwargs):
56
    # Locking in exclusive, blocking mode because there could be several
57
    # children running at the same time. Waiting up to 10 seconds.
58
    queue_lock.Exclusive(blocking=True, timeout=10)
59
    try:
60
      return fn(*args, **kwargs)
61
    finally:
62
      queue_lock.Unlock()
63
  return wrapper
64

    
65

    
66
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
67
  """The server implementation.
68

    
69
  This class holds all methods exposed over the RPC interface.
70

    
71
  """
72
  def HandleRequest(self):
73
    """Handle a request.
74

    
75
    """
76
    if self.command.upper() != "PUT":
77
      raise http.HTTPBadRequest()
78

    
79
    path = self.path
80
    if path.startswith("/"):
81
      path = path[1:]
82

    
83
    method = getattr(self, "perspective_%s" % path, None)
84
    if method is None:
85
      raise httperror.HTTPNotFound()
86

    
87
    try:
88
      try:
89
        return method(self.post_data)
90
      except:
91
        logging.exception("Error in RPC call")
92
        raise
93
    except errors.QuitGanetiException, err:
94
      # Tell parent to quit
95
      os.kill(self.server.noded_pid, signal.SIGTERM)
96

    
97
  # the new block devices  --------------------------
98

    
99
  @staticmethod
100
  def perspective_blockdev_create(params):
101
    """Create a block device.
102

    
103
    """
104
    bdev_s, size, owner, on_primary, info = params
105
    bdev = objects.Disk.FromDict(bdev_s)
106
    if bdev is None:
107
      raise ValueError("can't unserialize data!")
108
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
109

    
110
  @staticmethod
111
  def perspective_blockdev_remove(params):
112
    """Remove a block device.
113

    
114
    """
115
    bdev_s = params[0]
116
    bdev = objects.Disk.FromDict(bdev_s)
117
    return backend.RemoveBlockDevice(bdev)
118

    
119
  @staticmethod
120
  def perspective_blockdev_rename(params):
121
    """Remove a block device.
122

    
123
    """
124
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
125
    return backend.RenameBlockDevices(devlist)
126

    
127
  @staticmethod
128
  def perspective_blockdev_assemble(params):
129
    """Assemble a block device.
130

    
131
    """
132
    bdev_s, owner, on_primary = params
133
    bdev = objects.Disk.FromDict(bdev_s)
134
    if bdev is None:
135
      raise ValueError("can't unserialize data!")
136
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
137

    
138
  @staticmethod
139
  def perspective_blockdev_shutdown(params):
140
    """Shutdown a block device.
141

    
142
    """
143
    bdev_s = params[0]
144
    bdev = objects.Disk.FromDict(bdev_s)
145
    if bdev is None:
146
      raise ValueError("can't unserialize data!")
147
    return backend.ShutdownBlockDevice(bdev)
148

    
149
  @staticmethod
150
  def perspective_blockdev_addchildren(params):
151
    """Add a child to a mirror device.
152

    
153
    Note: this is only valid for mirror devices. It's the caller's duty
154
    to send a correct disk, otherwise we raise an error.
155

    
156
    """
157
    bdev_s, ndev_s = params
158
    bdev = objects.Disk.FromDict(bdev_s)
159
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
160
    if bdev is None or ndevs.count(None) > 0:
161
      raise ValueError("can't unserialize data!")
162
    return backend.MirrorAddChildren(bdev, ndevs)
163

    
164
  @staticmethod
165
  def perspective_blockdev_removechildren(params):
166
    """Remove a child from a mirror device.
167

    
168
    This is only valid for mirror devices, of course. It's the callers
169
    duty to send a correct disk, otherwise we raise an error.
170

    
171
    """
172
    bdev_s, ndev_s = params
173
    bdev = objects.Disk.FromDict(bdev_s)
174
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
175
    if bdev is None or ndevs.count(None) > 0:
176
      raise ValueError("can't unserialize data!")
177
    return backend.MirrorRemoveChildren(bdev, ndevs)
178

    
179
  @staticmethod
180
  def perspective_blockdev_getmirrorstatus(params):
181
    """Return the mirror status for a list of disks.
182

    
183
    """
184
    disks = [objects.Disk.FromDict(dsk_s)
185
            for dsk_s in params]
186
    return backend.GetMirrorStatus(disks)
187

    
188
  @staticmethod
189
  def perspective_blockdev_find(params):
190
    """Expose the FindBlockDevice functionality for a disk.
191

    
192
    This will try to find but not activate a disk.
193

    
194
    """
195
    disk = objects.Disk.FromDict(params[0])
196
    return backend.FindBlockDevice(disk)
197

    
198
  @staticmethod
199
  def perspective_blockdev_snapshot(params):
200
    """Create a snapshot device.
201

    
202
    Note that this is only valid for LVM disks, if we get passed
203
    something else we raise an exception. The snapshot device can be
204
    remove by calling the generic block device remove call.
205

    
206
    """
207
    cfbd = objects.Disk.FromDict(params[0])
208
    return backend.SnapshotBlockDevice(cfbd)
209

    
210
  @staticmethod
211
  def perspective_blockdev_grow(params):
212
    """Grow a stack of devices.
213

    
214
    """
215
    cfbd = objects.Disk.FromDict(params[0])
216
    amount = params[1]
217
    return backend.GrowBlockDevice(cfbd, amount)
218

    
219
  @staticmethod
220
  def perspective_blockdev_close(params):
221
    """Closes the given block devices.
222

    
223
    """
224
    disks = [objects.Disk.FromDict(cf) for cf in params]
225
    return backend.CloseBlockDevices(disks)
226

    
227
  # export/import  --------------------------
228

    
229
  @staticmethod
230
  def perspective_snapshot_export(params):
231
    """Export a given snapshot.
232

    
233
    """
234
    disk = objects.Disk.FromDict(params[0])
235
    dest_node = params[1]
236
    instance = objects.Instance.FromDict(params[2])
237
    return backend.ExportSnapshot(disk, dest_node, instance)
238

    
239
  @staticmethod
240
  def perspective_finalize_export(params):
241
    """Expose the finalize export functionality.
242

    
243
    """
244
    instance = objects.Instance.FromDict(params[0])
245
    snap_disks = [objects.Disk.FromDict(str_data)
246
                  for str_data in params[1]]
247
    return backend.FinalizeExport(instance, snap_disks)
248

    
249
  @staticmethod
250
  def perspective_export_info(params):
251
    """Query information about an existing export on this node.
252

    
253
    The given path may not contain an export, in which case we return
254
    None.
255

    
256
    """
257
    path = params[0]
258
    einfo = backend.ExportInfo(path)
259
    if einfo is None:
260
      return einfo
261
    return einfo.Dumps()
262

    
263
  @staticmethod
264
  def perspective_export_list(params):
265
    """List the available exports on this node.
266

    
267
    Note that as opposed to export_info, which may query data about an
268
    export in any path, this only queries the standard Ganeti path
269
    (constants.EXPORT_DIR).
270

    
271
    """
272
    return backend.ListExports()
273

    
274
  @staticmethod
275
  def perspective_export_remove(params):
276
    """Remove an export.
277

    
278
    """
279
    export = params[0]
280
    return backend.RemoveExport(export)
281

    
282
  # volume  --------------------------
283

    
284
  @staticmethod
285
  def perspective_volume_list(params):
286
    """Query the list of logical volumes in a given volume group.
287

    
288
    """
289
    vgname = params[0]
290
    return backend.GetVolumeList(vgname)
291

    
292
  @staticmethod
293
  def perspective_vg_list(params):
294
    """Query the list of volume groups.
295

    
296
    """
297
    return backend.ListVolumeGroups()
298

    
299
  # bridge  --------------------------
300

    
301
  @staticmethod
302
  def perspective_bridges_exist(params):
303
    """Check if all bridges given exist on this node.
304

    
305
    """
306
    bridges_list = params[0]
307
    return backend.BridgesExist(bridges_list)
308

    
309
  # instance  --------------------------
310

    
311
  @staticmethod
312
  def perspective_instance_os_add(params):
313
    """Install an OS on a given instance.
314

    
315
    """
316
    inst_s, os_disk, swap_disk = params
317
    inst = objects.Instance.FromDict(inst_s)
318
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
319

    
320
  @staticmethod
321
  def perspective_instance_run_rename(params):
322
    """Runs the OS rename script for an instance.
323

    
324
    """
325
    inst_s, old_name, os_disk, swap_disk = params
326
    inst = objects.Instance.FromDict(inst_s)
327
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
328

    
329
  @staticmethod
330
  def perspective_instance_os_import(params):
331
    """Run the import function of an OS onto a given instance.
332

    
333
    """
334
    inst_s, os_disk, swap_disk, src_node, src_image = params
335
    inst = objects.Instance.FromDict(inst_s)
336
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
337
                                        src_node, src_image)
338

    
339
  @staticmethod
340
  def perspective_instance_shutdown(params):
341
    """Shutdown an instance.
342

    
343
    """
344
    instance = objects.Instance.FromDict(params[0])
345
    return backend.ShutdownInstance(instance)
346

    
347
  @staticmethod
348
  def perspective_instance_start(params):
349
    """Start an instance.
350

    
351
    """
352
    instance = objects.Instance.FromDict(params[0])
353
    extra_args = params[1]
354
    return backend.StartInstance(instance, extra_args)
355

    
356
  @staticmethod
357
  def perspective_instance_migrate(params):
358
    """Migrates an instance.
359

    
360
    """
361
    instance, target, live = params
362
    return backend.MigrateInstance(instance, target, live)
363

    
364
  @staticmethod
365
  def perspective_instance_reboot(params):
366
    """Reboot an instance.
367

    
368
    """
369
    instance = objects.Instance.FromDict(params[0])
370
    reboot_type = params[1]
371
    extra_args = params[2]
372
    return backend.RebootInstance(instance, reboot_type, extra_args)
373

    
374
  @staticmethod
375
  def perspective_instance_info(params):
376
    """Query instance information.
377

    
378
    """
379
    return backend.GetInstanceInfo(params[0])
380

    
381
  @staticmethod
382
  def perspective_all_instances_info(params):
383
    """Query information about all instances.
384

    
385
    """
386
    return backend.GetAllInstancesInfo()
387

    
388
  @staticmethod
389
  def perspective_instance_list(params):
390
    """Query the list of running instances.
391

    
392
    """
393
    return backend.GetInstanceList()
394

    
395
  # node --------------------------
396

    
397
  @staticmethod
398
  def perspective_node_tcp_ping(params):
399
    """Do a TcpPing on the remote node.
400

    
401
    """
402
    return utils.TcpPing(params[1], params[2], timeout=params[3],
403
                         live_port_needed=params[4], source=params[0])
404

    
405
  @staticmethod
406
  def perspective_node_info(params):
407
    """Query node information.
408

    
409
    """
410
    vgname = params[0]
411
    return backend.GetNodeInfo(vgname)
412

    
413
  @staticmethod
414
  def perspective_node_add(params):
415
    """Complete the registration of this node in the cluster.
416

    
417
    """
418
    return backend.AddNode(params[0], params[1], params[2],
419
                           params[3], params[4], params[5])
420

    
421
  @staticmethod
422
  def perspective_node_verify(params):
423
    """Run a verify sequence on this node.
424

    
425
    """
426
    return backend.VerifyNode(params[0])
427

    
428
  @staticmethod
429
  def perspective_node_start_master(params):
430
    """Promote this node to master status.
431

    
432
    """
433
    return backend.StartMaster(params[0])
434

    
435
  @staticmethod
436
  def perspective_node_stop_master(params):
437
    """Demote this node from master status.
438

    
439
    """
440
    return backend.StopMaster(params[0])
441

    
442
  @staticmethod
443
  def perspective_node_leave_cluster(params):
444
    """Cleanup after leaving a cluster.
445

    
446
    """
447
    return backend.LeaveCluster()
448

    
449
  @staticmethod
450
  def perspective_node_volumes(params):
451
    """Query the list of all logical volume groups.
452

    
453
    """
454
    return backend.NodeVolumes()
455

    
456
  # cluster --------------------------
457

    
458
  @staticmethod
459
  def perspective_version(params):
460
    """Query version information.
461

    
462
    """
463
    return constants.PROTOCOL_VERSION
464

    
465
  @staticmethod
466
  def perspective_upload_file(params):
467
    """Upload a file.
468

    
469
    Note that the backend implementation imposes strict rules on which
470
    files are accepted.
471

    
472
    """
473
    return backend.UploadFile(*params)
474

    
475

    
476
  # os -----------------------
477

    
478
  @staticmethod
479
  def perspective_os_diagnose(params):
480
    """Query detailed information about existing OSes.
481

    
482
    """
483
    return [os.ToDict() for os in backend.DiagnoseOS()]
484

    
485
  @staticmethod
486
  def perspective_os_get(params):
487
    """Query information about a given OS.
488

    
489
    """
490
    name = params[0]
491
    try:
492
      os_obj = backend.OSFromDisk(name)
493
    except errors.InvalidOS, err:
494
      os_obj = objects.OS.FromInvalidOS(err)
495
    return os_obj.ToDict()
496

    
497
  # hooks -----------------------
498

    
499
  @staticmethod
500
  def perspective_hooks_runner(params):
501
    """Run hook scripts.
502

    
503
    """
504
    hpath, phase, env = params
505
    hr = backend.HooksRunner()
506
    return hr.RunHooks(hpath, phase, env)
507

    
508
  # iallocator -----------------
509

    
510
  @staticmethod
511
  def perspective_iallocator_runner(params):
512
    """Run an iallocator script.
513

    
514
    """
515
    name, idata = params
516
    iar = backend.IAllocatorRunner()
517
    return iar.Run(name, idata)
518

    
519
  # test -----------------------
520

    
521
  @staticmethod
522
  def perspective_test_delay(params):
523
    """Run test delay.
524

    
525
    """
526
    duration = params[0]
527
    return utils.TestDelay(duration)
528

    
529
  @staticmethod
530
  def perspective_file_storage_dir_create(params):
531
    """Create the file storage directory.
532

    
533
    """
534
    file_storage_dir = params[0]
535
    return backend.CreateFileStorageDir(file_storage_dir)
536

    
537
  @staticmethod
538
  def perspective_file_storage_dir_remove(params):
539
    """Remove the file storage directory.
540

    
541
    """
542
    file_storage_dir = params[0]
543
    return backend.RemoveFileStorageDir(file_storage_dir)
544

    
545
  @staticmethod
546
  def perspective_file_storage_dir_rename(params):
547
    """Rename the file storage directory.
548

    
549
    """
550
    old_file_storage_dir = params[0]
551
    new_file_storage_dir = params[1]
552
    return backend.RenameFileStorageDir(old_file_storage_dir,
553
                                        new_file_storage_dir)
554

    
555
  @staticmethod
556
  @_RequireJobQueueLock
557
  def perspective_jobqueue_update(params):
558
    """Update job queue.
559

    
560
    """
561
    (file_name, content) = params
562
    return backend.JobQueueUpdate(file_name, content)
563

    
564
  @staticmethod
565
  @_RequireJobQueueLock
566
  def perspective_jobqueue_purge(params):
567
    """Purge job queue.
568

    
569
    """
570
    return backend.JobQueuePurge()
571

    
572
  @staticmethod
573
  @_RequireJobQueueLock
574
  def perspective_jobqueue_rename(params):
575
    """Rename a job queue file.
576

    
577
    """
578
    (old, new) = params
579

    
580
    return backend.JobQueueRename(old, new)
581

    
582

    
583
class NodeDaemonHttpServer(http.HTTPServer):
584
  def __init__(self, server_address):
585
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
586
    self.noded_pid = os.getpid()
587

    
588
  def serve_forever(self):
589
    """Handle requests until told to quit."""
590
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
591
    try:
592
      while not sighandler.called:
593
        self.handle_request()
594
      # TODO: There could be children running at this point
595
    finally:
596
      sighandler.Reset()
597

    
598

    
599
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
600
  """Forking HTTP Server.
601

    
602
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
603
  request we handle. This allows more requests to be handled concurrently.
604

    
605
  """
606

    
607

    
608
def ParseOptions():
609
  """Parse the command line options.
610

    
611
  Returns:
612
    (options, args) as from OptionParser.parse_args()
613

    
614
  """
615
  parser = OptionParser(description="Ganeti node daemon",
616
                        usage="%prog [-f] [-d]",
617
                        version="%%prog (ganeti) %s" %
618
                        constants.RELEASE_VERSION)
619

    
620
  parser.add_option("-f", "--foreground", dest="fork",
621
                    help="Don't detach from the current terminal",
622
                    default=True, action="store_false")
623
  parser.add_option("-d", "--debug", dest="debug",
624
                    help="Enable some debug messages",
625
                    default=False, action="store_true")
626
  options, args = parser.parse_args()
627
  return options, args
628

    
629

    
630
def main():
631
  """Main function for the node daemon.
632

    
633
  """
634
  global queue_lock
635

    
636
  options, args = ParseOptions()
637
  utils.debug = options.debug
638
  for fname in (constants.SSL_CERT_FILE,):
639
    if not os.path.isfile(fname):
640
      print "config %s not there, will not run." % fname
641
      sys.exit(5)
642

    
643
  try:
644
    ss = ssconf.SimpleStore()
645
    port = ss.GetNodeDaemonPort()
646
    pwdata = ss.GetNodeDaemonPassword()
647
  except errors.ConfigurationError, err:
648
    print "Cluster configuration incomplete: '%s'" % str(err)
649
    sys.exit(5)
650

    
651
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
652
  # situation where RUN_DIR is tmpfs
653
  for dir_name in constants.SUB_RUN_DIRS:
654
    if not os.path.exists(dir_name):
655
      try:
656
        os.mkdir(dir_name, 0755)
657
      except EnvironmentError, err:
658
        if err.errno != errno.EEXIST:
659
          print ("Node setup wrong, cannot create directory %s: %s" %
660
                 (dir_name, err))
661
          sys.exit(5)
662
    if not os.path.isdir(dir_name):
663
      print ("Node setup wrong, %s is not a directory" % dir_name)
664
      sys.exit(5)
665

    
666
  # become a daemon
667
  if options.fork:
668
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
669

    
670
  utils.WritePidFile(constants.NODED_PID)
671

    
672
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
673
                      stderr_logging=not options.fork)
674
  logging.info("ganeti node daemon startup")
675

    
676
  # Prepare job queue
677
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
678

    
679
  if options.fork:
680
    server = ForkingHTTPServer(('', port))
681
  else:
682
    server = NodeDaemonHttpServer(('', port))
683

    
684
  try:
685
    server.serve_forever()
686
  finally:
687
    utils.RemovePidFile(constants.NODED_PID)
688

    
689

    
690
if __name__ == '__main__':
691
  main()