Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 4e071d3b

History | View | Annotate | Download (17.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import ssconf
44
from ganeti import http
45
from ganeti import utils
46

    
47

    
48
queue_lock = None
49

    
50

    
51
def _RequireJobQueueLock(fn):
52
  """Decorator for job queue manipulating functions.
53

    
54
  """
55
  def wrapper(*args, **kwargs):
56
    # Locking in exclusive, blocking mode because there could be several
57
    # children running at the same time. Waiting up to 10 seconds.
58
    queue_lock.Exclusive(blocking=True, timeout=10)
59
    try:
60
      return fn(*args, **kwargs)
61
    finally:
62
      queue_lock.Unlock()
63
  return wrapper
64

    
65

    
66
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
67
  """The server implementation.
68

    
69
  This class holds all methods exposed over the RPC interface.
70

    
71
  """
72
  def HandleRequest(self):
73
    """Handle a request.
74

    
75
    """
76
    if self.command.upper() != "PUT":
77
      raise http.HTTPBadRequest()
78

    
79
    path = self.path
80
    if path.startswith("/"):
81
      path = path[1:]
82

    
83
    method = getattr(self, "perspective_%s" % path, None)
84
    if method is None:
85
      raise httperror.HTTPNotFound()
86

    
87
    try:
88
      try:
89
        return method(self.post_data)
90
      except:
91
        logging.exception("Error in RPC call")
92
        raise
93
    except errors.QuitGanetiException, err:
94
      # Tell parent to quit
95
      os.kill(self.server.noded_pid, signal.SIGTERM)
96

    
97
  # the new block devices  --------------------------
98

    
99
  @staticmethod
100
  def perspective_blockdev_create(params):
101
    """Create a block device.
102

    
103
    """
104
    bdev_s, size, owner, on_primary, info = params
105
    bdev = objects.Disk.FromDict(bdev_s)
106
    if bdev is None:
107
      raise ValueError("can't unserialize data!")
108
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
109

    
110
  @staticmethod
111
  def perspective_blockdev_remove(params):
112
    """Remove a block device.
113

    
114
    """
115
    bdev_s = params[0]
116
    bdev = objects.Disk.FromDict(bdev_s)
117
    return backend.RemoveBlockDevice(bdev)
118

    
119
  @staticmethod
120
  def perspective_blockdev_rename(params):
121
    """Remove a block device.
122

    
123
    """
124
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
125
    return backend.RenameBlockDevices(devlist)
126

    
127
  @staticmethod
128
  def perspective_blockdev_assemble(params):
129
    """Assemble a block device.
130

    
131
    """
132
    bdev_s, owner, on_primary = params
133
    bdev = objects.Disk.FromDict(bdev_s)
134
    if bdev is None:
135
      raise ValueError("can't unserialize data!")
136
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
137

    
138
  @staticmethod
139
  def perspective_blockdev_shutdown(params):
140
    """Shutdown a block device.
141

    
142
    """
143
    bdev_s = params[0]
144
    bdev = objects.Disk.FromDict(bdev_s)
145
    if bdev is None:
146
      raise ValueError("can't unserialize data!")
147
    return backend.ShutdownBlockDevice(bdev)
148

    
149
  @staticmethod
150
  def perspective_blockdev_addchildren(params):
151
    """Add a child to a mirror device.
152

    
153
    Note: this is only valid for mirror devices. It's the caller's duty
154
    to send a correct disk, otherwise we raise an error.
155

    
156
    """
157
    bdev_s, ndev_s = params
158
    bdev = objects.Disk.FromDict(bdev_s)
159
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
160
    if bdev is None or ndevs.count(None) > 0:
161
      raise ValueError("can't unserialize data!")
162
    return backend.MirrorAddChildren(bdev, ndevs)
163

    
164
  @staticmethod
165
  def perspective_blockdev_removechildren(params):
166
    """Remove a child from a mirror device.
167

    
168
    This is only valid for mirror devices, of course. It's the callers
169
    duty to send a correct disk, otherwise we raise an error.
170

    
171
    """
172
    bdev_s, ndev_s = params
173
    bdev = objects.Disk.FromDict(bdev_s)
174
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
175
    if bdev is None or ndevs.count(None) > 0:
176
      raise ValueError("can't unserialize data!")
177
    return backend.MirrorRemoveChildren(bdev, ndevs)
178

    
179
  @staticmethod
180
  def perspective_blockdev_getmirrorstatus(params):
181
    """Return the mirror status for a list of disks.
182

    
183
    """
184
    disks = [objects.Disk.FromDict(dsk_s)
185
            for dsk_s in params]
186
    return backend.GetMirrorStatus(disks)
187

    
188
  @staticmethod
189
  def perspective_blockdev_find(params):
190
    """Expose the FindBlockDevice functionality for a disk.
191

    
192
    This will try to find but not activate a disk.
193

    
194
    """
195
    disk = objects.Disk.FromDict(params[0])
196
    return backend.FindBlockDevice(disk)
197

    
198
  @staticmethod
199
  def perspective_blockdev_snapshot(params):
200
    """Create a snapshot device.
201

    
202
    Note that this is only valid for LVM disks, if we get passed
203
    something else we raise an exception. The snapshot device can be
204
    remove by calling the generic block device remove call.
205

    
206
    """
207
    cfbd = objects.Disk.FromDict(params[0])
208
    return backend.SnapshotBlockDevice(cfbd)
209

    
210
  @staticmethod
211
  def perspective_blockdev_grow(params):
212
    """Grow a stack of devices.
213

    
214
    """
215
    cfbd = objects.Disk.FromDict(params[0])
216
    amount = params[1]
217
    return backend.GrowBlockDevice(cfbd, amount)
218

    
219
  @staticmethod
220
  def perspective_blockdev_close(params):
221
    """Closes the given block devices.
222

    
223
    """
224
    disks = [objects.Disk.FromDict(cf) for cf in params]
225
    return backend.CloseBlockDevices(disks)
226

    
227
  # export/import  --------------------------
228

    
229
  @staticmethod
230
  def perspective_snapshot_export(params):
231
    """Export a given snapshot.
232

    
233
    """
234
    disk = objects.Disk.FromDict(params[0])
235
    dest_node = params[1]
236
    instance = objects.Instance.FromDict(params[2])
237
    return backend.ExportSnapshot(disk, dest_node, instance)
238

    
239
  @staticmethod
240
  def perspective_finalize_export(params):
241
    """Expose the finalize export functionality.
242

    
243
    """
244
    instance = objects.Instance.FromDict(params[0])
245
    snap_disks = [objects.Disk.FromDict(str_data)
246
                  for str_data in params[1]]
247
    return backend.FinalizeExport(instance, snap_disks)
248

    
249
  @staticmethod
250
  def perspective_export_info(params):
251
    """Query information about an existing export on this node.
252

    
253
    The given path may not contain an export, in which case we return
254
    None.
255

    
256
    """
257
    path = params[0]
258
    einfo = backend.ExportInfo(path)
259
    if einfo is None:
260
      return einfo
261
    return einfo.Dumps()
262

    
263
  @staticmethod
264
  def perspective_export_list(params):
265
    """List the available exports on this node.
266

    
267
    Note that as opposed to export_info, which may query data about an
268
    export in any path, this only queries the standard Ganeti path
269
    (constants.EXPORT_DIR).
270

    
271
    """
272
    return backend.ListExports()
273

    
274
  @staticmethod
275
  def perspective_export_remove(params):
276
    """Remove an export.
277

    
278
    """
279
    export = params[0]
280
    return backend.RemoveExport(export)
281

    
282
  # volume  --------------------------
283

    
284
  @staticmethod
285
  def perspective_volume_list(params):
286
    """Query the list of logical volumes in a given volume group.
287

    
288
    """
289
    vgname = params[0]
290
    return backend.GetVolumeList(vgname)
291

    
292
  @staticmethod
293
  def perspective_vg_list(params):
294
    """Query the list of volume groups.
295

    
296
    """
297
    return backend.ListVolumeGroups()
298

    
299
  # bridge  --------------------------
300

    
301
  @staticmethod
302
  def perspective_bridges_exist(params):
303
    """Check if all bridges given exist on this node.
304

    
305
    """
306
    bridges_list = params[0]
307
    return backend.BridgesExist(bridges_list)
308

    
309
  # instance  --------------------------
310

    
311
  @staticmethod
312
  def perspective_instance_os_add(params):
313
    """Install an OS on a given instance.
314

    
315
    """
316
    inst_s, os_disk, swap_disk = params
317
    inst = objects.Instance.FromDict(inst_s)
318
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
319

    
320
  @staticmethod
321
  def perspective_instance_run_rename(params):
322
    """Runs the OS rename script for an instance.
323

    
324
    """
325
    inst_s, old_name, os_disk, swap_disk = params
326
    inst = objects.Instance.FromDict(inst_s)
327
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
328

    
329
  @staticmethod
330
  def perspective_instance_os_import(params):
331
    """Run the import function of an OS onto a given instance.
332

    
333
    """
334
    inst_s, os_disk, swap_disk, src_node, src_image = params
335
    inst = objects.Instance.FromDict(inst_s)
336
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
337
                                        src_node, src_image)
338

    
339
  @staticmethod
340
  def perspective_instance_shutdown(params):
341
    """Shutdown an instance.
342

    
343
    """
344
    instance = objects.Instance.FromDict(params[0])
345
    return backend.ShutdownInstance(instance)
346

    
347
  @staticmethod
348
  def perspective_instance_start(params):
349
    """Start an instance.
350

    
351
    """
352
    instance = objects.Instance.FromDict(params[0])
353
    extra_args = params[1]
354
    return backend.StartInstance(instance, extra_args)
355

    
356
  @staticmethod
357
  def perspective_instance_migrate(params):
358
    """Migrates an instance.
359

    
360
    """
361
    instance, target, live = params
362
    return backend.MigrateInstance(instance, target, live)
363

    
364
  @staticmethod
365
  def perspective_instance_reboot(params):
366
    """Reboot an instance.
367

    
368
    """
369
    instance = objects.Instance.FromDict(params[0])
370
    reboot_type = params[1]
371
    extra_args = params[2]
372
    return backend.RebootInstance(instance, reboot_type, extra_args)
373

    
374
  @staticmethod
375
  def perspective_instance_info(params):
376
    """Query instance information.
377

    
378
    """
379
    return backend.GetInstanceInfo(params[0])
380

    
381
  @staticmethod
382
  def perspective_all_instances_info(params):
383
    """Query information about all instances.
384

    
385
    """
386
    return backend.GetAllInstancesInfo()
387

    
388
  @staticmethod
389
  def perspective_instance_list(params):
390
    """Query the list of running instances.
391

    
392
    """
393
    return backend.GetInstanceList()
394

    
395
  # node --------------------------
396

    
397
  @staticmethod
398
  def perspective_node_tcp_ping(params):
399
    """Do a TcpPing on the remote node.
400

    
401
    """
402
    return utils.TcpPing(params[1], params[2], timeout=params[3],
403
                         live_port_needed=params[4], source=params[0])
404

    
405
  @staticmethod
406
  def perspective_node_info(params):
407
    """Query node information.
408

    
409
    """
410
    vgname = params[0]
411
    return backend.GetNodeInfo(vgname)
412

    
413
  @staticmethod
414
  def perspective_node_add(params):
415
    """Complete the registration of this node in the cluster.
416

    
417
    """
418
    return backend.AddNode(params[0], params[1], params[2],
419
                           params[3], params[4], params[5])
420

    
421
  @staticmethod
422
  def perspective_node_verify(params):
423
    """Run a verify sequence on this node.
424

    
425
    """
426
    return backend.VerifyNode(params[0])
427

    
428
  @staticmethod
429
  def perspective_node_start_master(params):
430
    """Promote this node to master status.
431

    
432
    """
433
    return backend.StartMaster(params[0])
434

    
435
  @staticmethod
436
  def perspective_node_stop_master(params):
437
    """Demote this node from master status.
438

    
439
    """
440
    return backend.StopMaster(params[0])
441

    
442
  @staticmethod
443
  def perspective_node_leave_cluster(params):
444
    """Cleanup after leaving a cluster.
445

    
446
    """
447
    return backend.LeaveCluster()
448

    
449
  @staticmethod
450
  def perspective_node_volumes(params):
451
    """Query the list of all logical volume groups.
452

    
453
    """
454
    return backend.NodeVolumes()
455

    
456
  # cluster --------------------------
457

    
458
  @staticmethod
459
  def perspective_version(params):
460
    """Query version information.
461

    
462
    """
463
    return constants.PROTOCOL_VERSION
464

    
465
  @staticmethod
466
  def perspective_upload_file(params):
467
    """Upload a file.
468

    
469
    Note that the backend implementation imposes strict rules on which
470
    files are accepted.
471

    
472
    """
473
    return backend.UploadFile(*params)
474

    
475
  @staticmethod
476
  def perspective_master_info(params):
477
    """Query master information.
478

    
479
    """
480
    return backend.GetMasterInfo()
481

    
482
  # os -----------------------
483

    
484
  @staticmethod
485
  def perspective_os_diagnose(params):
486
    """Query detailed information about existing OSes.
487

    
488
    """
489
    return [os.ToDict() for os in backend.DiagnoseOS()]
490

    
491
  @staticmethod
492
  def perspective_os_get(params):
493
    """Query information about a given OS.
494

    
495
    """
496
    name = params[0]
497
    try:
498
      os_obj = backend.OSFromDisk(name)
499
    except errors.InvalidOS, err:
500
      os_obj = objects.OS.FromInvalidOS(err)
501
    return os_obj.ToDict()
502

    
503
  # hooks -----------------------
504

    
505
  @staticmethod
506
  def perspective_hooks_runner(params):
507
    """Run hook scripts.
508

    
509
    """
510
    hpath, phase, env = params
511
    hr = backend.HooksRunner()
512
    return hr.RunHooks(hpath, phase, env)
513

    
514
  # iallocator -----------------
515

    
516
  @staticmethod
517
  def perspective_iallocator_runner(params):
518
    """Run an iallocator script.
519

    
520
    """
521
    name, idata = params
522
    iar = backend.IAllocatorRunner()
523
    return iar.Run(name, idata)
524

    
525
  # test -----------------------
526

    
527
  @staticmethod
528
  def perspective_test_delay(params):
529
    """Run test delay.
530

    
531
    """
532
    duration = params[0]
533
    return utils.TestDelay(duration)
534

    
535
  # file storage ---------------
536

    
537
  @staticmethod
538
  def perspective_file_storage_dir_create(params):
539
    """Create the file storage directory.
540

    
541
    """
542
    file_storage_dir = params[0]
543
    return backend.CreateFileStorageDir(file_storage_dir)
544

    
545
  @staticmethod
546
  def perspective_file_storage_dir_remove(params):
547
    """Remove the file storage directory.
548

    
549
    """
550
    file_storage_dir = params[0]
551
    return backend.RemoveFileStorageDir(file_storage_dir)
552

    
553
  @staticmethod
554
  def perspective_file_storage_dir_rename(params):
555
    """Rename the file storage directory.
556

    
557
    """
558
    old_file_storage_dir = params[0]
559
    new_file_storage_dir = params[1]
560
    return backend.RenameFileStorageDir(old_file_storage_dir,
561
                                        new_file_storage_dir)
562

    
563
  # jobs ------------------------
564

    
565
  @staticmethod
566
  @_RequireJobQueueLock
567
  def perspective_jobqueue_update(params):
568
    """Update job queue.
569

    
570
    """
571
    (file_name, content) = params
572
    return backend.JobQueueUpdate(file_name, content)
573

    
574
  @staticmethod
575
  @_RequireJobQueueLock
576
  def perspective_jobqueue_purge(params):
577
    """Purge job queue.
578

    
579
    """
580
    return backend.JobQueuePurge()
581

    
582
  @staticmethod
583
  @_RequireJobQueueLock
584
  def perspective_jobqueue_rename(params):
585
    """Rename a job queue file.
586

    
587
    """
588
    (old, new) = params
589

    
590
    return backend.JobQueueRename(old, new)
591

    
592

    
593
class NodeDaemonHttpServer(http.HTTPServer):
594
  def __init__(self, server_address):
595
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
596
    self.noded_pid = os.getpid()
597

    
598
  def serve_forever(self):
599
    """Handle requests until told to quit."""
600
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
601
    try:
602
      while not sighandler.called:
603
        self.handle_request()
604
      # TODO: There could be children running at this point
605
    finally:
606
      sighandler.Reset()
607

    
608

    
609
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
610
  """Forking HTTP Server.
611

    
612
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
613
  request we handle. This allows more requests to be handled concurrently.
614

    
615
  """
616

    
617

    
618
def ParseOptions():
619
  """Parse the command line options.
620

    
621
  Returns:
622
    (options, args) as from OptionParser.parse_args()
623

    
624
  """
625
  parser = OptionParser(description="Ganeti node daemon",
626
                        usage="%prog [-f] [-d]",
627
                        version="%%prog (ganeti) %s" %
628
                        constants.RELEASE_VERSION)
629

    
630
  parser.add_option("-f", "--foreground", dest="fork",
631
                    help="Don't detach from the current terminal",
632
                    default=True, action="store_false")
633
  parser.add_option("-d", "--debug", dest="debug",
634
                    help="Enable some debug messages",
635
                    default=False, action="store_true")
636
  options, args = parser.parse_args()
637
  return options, args
638

    
639

    
640
def main():
641
  """Main function for the node daemon.
642

    
643
  """
644
  global queue_lock
645

    
646
  options, args = ParseOptions()
647
  utils.debug = options.debug
648
  for fname in (constants.SSL_CERT_FILE,):
649
    if not os.path.isfile(fname):
650
      print "config %s not there, will not run." % fname
651
      sys.exit(5)
652

    
653
  try:
654
    ss = ssconf.SimpleStore()
655
    port = ss.GetNodeDaemonPort()
656
    pwdata = ss.GetNodeDaemonPassword()
657
  except errors.ConfigurationError, err:
658
    print "Cluster configuration incomplete: '%s'" % str(err)
659
    sys.exit(5)
660

    
661
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
662
  # situation where RUN_DIR is tmpfs
663
  for dir_name in constants.SUB_RUN_DIRS:
664
    if not os.path.exists(dir_name):
665
      try:
666
        os.mkdir(dir_name, 0755)
667
      except EnvironmentError, err:
668
        if err.errno != errno.EEXIST:
669
          print ("Node setup wrong, cannot create directory %s: %s" %
670
                 (dir_name, err))
671
          sys.exit(5)
672
    if not os.path.isdir(dir_name):
673
      print ("Node setup wrong, %s is not a directory" % dir_name)
674
      sys.exit(5)
675

    
676
  # become a daemon
677
  if options.fork:
678
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
679

    
680
  utils.WritePidFile(constants.NODED_PID)
681

    
682
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
683
                      stderr_logging=not options.fork)
684
  logging.info("ganeti node daemon startup")
685

    
686
  # Prepare job queue
687
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
688

    
689
  if options.fork:
690
    server = ForkingHTTPServer(('', port))
691
  else:
692
    server = NodeDaemonHttpServer(('', port))
693

    
694
  try:
695
    server.serve_forever()
696
  finally:
697
    utils.RemovePidFile(constants.NODED_PID)
698

    
699

    
700
if __name__ == '__main__':
701
  main()