Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 8594f271

History | View | Annotate | Download (17.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import http
44
from ganeti import utils
45

    
46

    
47
queue_lock = None
48

    
49

    
50
def _RequireJobQueueLock(fn):
51
  """Decorator for job queue manipulating functions.
52

    
53
  """
54
  QUEUE_LOCK_TIMEOUT = 10
55

    
56
  def wrapper(*args, **kwargs):
57
    # Locking in exclusive, blocking mode because there could be several
58
    # children running at the same time. Waiting up to 10 seconds.
59
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60
    try:
61
      return fn(*args, **kwargs)
62
    finally:
63
      queue_lock.Unlock()
64

    
65
  return wrapper
66

    
67

    
68
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
69
  """The server implementation.
70

    
71
  This class holds all methods exposed over the RPC interface.
72

    
73
  """
74
  def HandleRequest(self):
75
    """Handle a request.
76

    
77
    """
78
    if self.command.upper() != "PUT":
79
      raise http.HTTPBadRequest()
80

    
81
    path = self.path
82
    if path.startswith("/"):
83
      path = path[1:]
84

    
85
    method = getattr(self, "perspective_%s" % path, None)
86
    if method is None:
87
      raise httperror.HTTPNotFound()
88

    
89
    try:
90
      try:
91
        return method(self.post_data)
92
      except:
93
        logging.exception("Error in RPC call")
94
        raise
95
    except errors.QuitGanetiException, err:
96
      # Tell parent to quit
97
      os.kill(self.server.noded_pid, signal.SIGTERM)
98

    
99
  # the new block devices  --------------------------
100

    
101
  @staticmethod
102
  def perspective_blockdev_create(params):
103
    """Create a block device.
104

    
105
    """
106
    bdev_s, size, owner, on_primary, info = params
107
    bdev = objects.Disk.FromDict(bdev_s)
108
    if bdev is None:
109
      raise ValueError("can't unserialize data!")
110
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
111

    
112
  @staticmethod
113
  def perspective_blockdev_remove(params):
114
    """Remove a block device.
115

    
116
    """
117
    bdev_s = params[0]
118
    bdev = objects.Disk.FromDict(bdev_s)
119
    return backend.RemoveBlockDevice(bdev)
120

    
121
  @staticmethod
122
  def perspective_blockdev_rename(params):
123
    """Remove a block device.
124

    
125
    """
126
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
127
    return backend.RenameBlockDevices(devlist)
128

    
129
  @staticmethod
130
  def perspective_blockdev_assemble(params):
131
    """Assemble a block device.
132

    
133
    """
134
    bdev_s, owner, on_primary = params
135
    bdev = objects.Disk.FromDict(bdev_s)
136
    if bdev is None:
137
      raise ValueError("can't unserialize data!")
138
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
139

    
140
  @staticmethod
141
  def perspective_blockdev_shutdown(params):
142
    """Shutdown a block device.
143

    
144
    """
145
    bdev_s = params[0]
146
    bdev = objects.Disk.FromDict(bdev_s)
147
    if bdev is None:
148
      raise ValueError("can't unserialize data!")
149
    return backend.ShutdownBlockDevice(bdev)
150

    
151
  @staticmethod
152
  def perspective_blockdev_addchildren(params):
153
    """Add a child to a mirror device.
154

    
155
    Note: this is only valid for mirror devices. It's the caller's duty
156
    to send a correct disk, otherwise we raise an error.
157

    
158
    """
159
    bdev_s, ndev_s = params
160
    bdev = objects.Disk.FromDict(bdev_s)
161
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
162
    if bdev is None or ndevs.count(None) > 0:
163
      raise ValueError("can't unserialize data!")
164
    return backend.MirrorAddChildren(bdev, ndevs)
165

    
166
  @staticmethod
167
  def perspective_blockdev_removechildren(params):
168
    """Remove a child from a mirror device.
169

    
170
    This is only valid for mirror devices, of course. It's the callers
171
    duty to send a correct disk, otherwise we raise an error.
172

    
173
    """
174
    bdev_s, ndev_s = params
175
    bdev = objects.Disk.FromDict(bdev_s)
176
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177
    if bdev is None or ndevs.count(None) > 0:
178
      raise ValueError("can't unserialize data!")
179
    return backend.MirrorRemoveChildren(bdev, ndevs)
180

    
181
  @staticmethod
182
  def perspective_blockdev_getmirrorstatus(params):
183
    """Return the mirror status for a list of disks.
184

    
185
    """
186
    disks = [objects.Disk.FromDict(dsk_s)
187
            for dsk_s in params]
188
    return backend.GetMirrorStatus(disks)
189

    
190
  @staticmethod
191
  def perspective_blockdev_find(params):
192
    """Expose the FindBlockDevice functionality for a disk.
193

    
194
    This will try to find but not activate a disk.
195

    
196
    """
197
    disk = objects.Disk.FromDict(params[0])
198
    return backend.FindBlockDevice(disk)
199

    
200
  @staticmethod
201
  def perspective_blockdev_snapshot(params):
202
    """Create a snapshot device.
203

    
204
    Note that this is only valid for LVM disks, if we get passed
205
    something else we raise an exception. The snapshot device can be
206
    remove by calling the generic block device remove call.
207

    
208
    """
209
    cfbd = objects.Disk.FromDict(params[0])
210
    return backend.SnapshotBlockDevice(cfbd)
211

    
212
  @staticmethod
213
  def perspective_blockdev_grow(params):
214
    """Grow a stack of devices.
215

    
216
    """
217
    cfbd = objects.Disk.FromDict(params[0])
218
    amount = params[1]
219
    return backend.GrowBlockDevice(cfbd, amount)
220

    
221
  @staticmethod
222
  def perspective_blockdev_close(params):
223
    """Closes the given block devices.
224

    
225
    """
226
    disks = [objects.Disk.FromDict(cf) for cf in params]
227
    return backend.CloseBlockDevices(disks)
228

    
229
  # export/import  --------------------------
230

    
231
  @staticmethod
232
  def perspective_snapshot_export(params):
233
    """Export a given snapshot.
234

    
235
    """
236
    disk = objects.Disk.FromDict(params[0])
237
    dest_node = params[1]
238
    instance = objects.Instance.FromDict(params[2])
239
    return backend.ExportSnapshot(disk, dest_node, instance)
240

    
241
  @staticmethod
242
  def perspective_finalize_export(params):
243
    """Expose the finalize export functionality.
244

    
245
    """
246
    instance = objects.Instance.FromDict(params[0])
247
    snap_disks = [objects.Disk.FromDict(str_data)
248
                  for str_data in params[1]]
249
    return backend.FinalizeExport(instance, snap_disks)
250

    
251
  @staticmethod
252
  def perspective_export_info(params):
253
    """Query information about an existing export on this node.
254

    
255
    The given path may not contain an export, in which case we return
256
    None.
257

    
258
    """
259
    path = params[0]
260
    einfo = backend.ExportInfo(path)
261
    if einfo is None:
262
      return einfo
263
    return einfo.Dumps()
264

    
265
  @staticmethod
266
  def perspective_export_list(params):
267
    """List the available exports on this node.
268

    
269
    Note that as opposed to export_info, which may query data about an
270
    export in any path, this only queries the standard Ganeti path
271
    (constants.EXPORT_DIR).
272

    
273
    """
274
    return backend.ListExports()
275

    
276
  @staticmethod
277
  def perspective_export_remove(params):
278
    """Remove an export.
279

    
280
    """
281
    export = params[0]
282
    return backend.RemoveExport(export)
283

    
284
  # volume  --------------------------
285

    
286
  @staticmethod
287
  def perspective_volume_list(params):
288
    """Query the list of logical volumes in a given volume group.
289

    
290
    """
291
    vgname = params[0]
292
    return backend.GetVolumeList(vgname)
293

    
294
  @staticmethod
295
  def perspective_vg_list(params):
296
    """Query the list of volume groups.
297

    
298
    """
299
    return backend.ListVolumeGroups()
300

    
301
  # bridge  --------------------------
302

    
303
  @staticmethod
304
  def perspective_bridges_exist(params):
305
    """Check if all bridges given exist on this node.
306

    
307
    """
308
    bridges_list = params[0]
309
    return backend.BridgesExist(bridges_list)
310

    
311
  # instance  --------------------------
312

    
313
  @staticmethod
314
  def perspective_instance_os_add(params):
315
    """Install an OS on a given instance.
316

    
317
    """
318
    inst_s, os_disk, swap_disk = params
319
    inst = objects.Instance.FromDict(inst_s)
320
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
321

    
322
  @staticmethod
323
  def perspective_instance_run_rename(params):
324
    """Runs the OS rename script for an instance.
325

    
326
    """
327
    inst_s, old_name, os_disk, swap_disk = params
328
    inst = objects.Instance.FromDict(inst_s)
329
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
330

    
331
  @staticmethod
332
  def perspective_instance_os_import(params):
333
    """Run the import function of an OS onto a given instance.
334

    
335
    """
336
    inst_s, os_disk, swap_disk, src_node, src_image = params
337
    inst = objects.Instance.FromDict(inst_s)
338
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
339
                                        src_node, src_image)
340

    
341
  @staticmethod
342
  def perspective_instance_shutdown(params):
343
    """Shutdown an instance.
344

    
345
    """
346
    instance = objects.Instance.FromDict(params[0])
347
    return backend.ShutdownInstance(instance)
348

    
349
  @staticmethod
350
  def perspective_instance_start(params):
351
    """Start an instance.
352

    
353
    """
354
    instance = objects.Instance.FromDict(params[0])
355
    extra_args = params[1]
356
    return backend.StartInstance(instance, extra_args)
357

    
358
  @staticmethod
359
  def perspective_instance_migrate(params):
360
    """Migrates an instance.
361

    
362
    """
363
    instance, target, live = params
364
    return backend.MigrateInstance(instance, target, live)
365

    
366
  @staticmethod
367
  def perspective_instance_reboot(params):
368
    """Reboot an instance.
369

    
370
    """
371
    instance = objects.Instance.FromDict(params[0])
372
    reboot_type = params[1]
373
    extra_args = params[2]
374
    return backend.RebootInstance(instance, reboot_type, extra_args)
375

    
376
  @staticmethod
377
  def perspective_instance_info(params):
378
    """Query instance information.
379

    
380
    """
381
    return backend.GetInstanceInfo(params[0])
382

    
383
  @staticmethod
384
  def perspective_all_instances_info(params):
385
    """Query information about all instances.
386

    
387
    """
388
    return backend.GetAllInstancesInfo()
389

    
390
  @staticmethod
391
  def perspective_instance_list(params):
392
    """Query the list of running instances.
393

    
394
    """
395
    return backend.GetInstanceList()
396

    
397
  # node --------------------------
398

    
399
  @staticmethod
400
  def perspective_node_tcp_ping(params):
401
    """Do a TcpPing on the remote node.
402

    
403
    """
404
    return utils.TcpPing(params[1], params[2], timeout=params[3],
405
                         live_port_needed=params[4], source=params[0])
406

    
407
  @staticmethod
408
  def perspective_node_info(params):
409
    """Query node information.
410

    
411
    """
412
    vgname = params[0]
413
    return backend.GetNodeInfo(vgname)
414

    
415
  @staticmethod
416
  def perspective_node_add(params):
417
    """Complete the registration of this node in the cluster.
418

    
419
    """
420
    return backend.AddNode(params[0], params[1], params[2],
421
                           params[3], params[4], params[5])
422

    
423
  @staticmethod
424
  def perspective_node_verify(params):
425
    """Run a verify sequence on this node.
426

    
427
    """
428
    return backend.VerifyNode(params[0])
429

    
430
  @staticmethod
431
  def perspective_node_start_master(params):
432
    """Promote this node to master status.
433

    
434
    """
435
    return backend.StartMaster(params[0])
436

    
437
  @staticmethod
438
  def perspective_node_stop_master(params):
439
    """Demote this node from master status.
440

    
441
    """
442
    return backend.StopMaster(params[0])
443

    
444
  @staticmethod
445
  def perspective_node_leave_cluster(params):
446
    """Cleanup after leaving a cluster.
447

    
448
    """
449
    return backend.LeaveCluster()
450

    
451
  @staticmethod
452
  def perspective_node_volumes(params):
453
    """Query the list of all logical volume groups.
454

    
455
    """
456
    return backend.NodeVolumes()
457

    
458
  # cluster --------------------------
459

    
460
  @staticmethod
461
  def perspective_version(params):
462
    """Query version information.
463

    
464
    """
465
    return constants.PROTOCOL_VERSION
466

    
467
  @staticmethod
468
  def perspective_upload_file(params):
469
    """Upload a file.
470

    
471
    Note that the backend implementation imposes strict rules on which
472
    files are accepted.
473

    
474
    """
475
    return backend.UploadFile(*params)
476

    
477
  @staticmethod
478
  def perspective_master_info(params):
479
    """Query master information.
480

    
481
    """
482
    return backend.GetMasterInfo()
483

    
484
  # os -----------------------
485

    
486
  @staticmethod
487
  def perspective_os_diagnose(params):
488
    """Query detailed information about existing OSes.
489

    
490
    """
491
    return [os.ToDict() for os in backend.DiagnoseOS()]
492

    
493
  @staticmethod
494
  def perspective_os_get(params):
495
    """Query information about a given OS.
496

    
497
    """
498
    name = params[0]
499
    try:
500
      os_obj = backend.OSFromDisk(name)
501
    except errors.InvalidOS, err:
502
      os_obj = objects.OS.FromInvalidOS(err)
503
    return os_obj.ToDict()
504

    
505
  # hooks -----------------------
506

    
507
  @staticmethod
508
  def perspective_hooks_runner(params):
509
    """Run hook scripts.
510

    
511
    """
512
    hpath, phase, env = params
513
    hr = backend.HooksRunner()
514
    return hr.RunHooks(hpath, phase, env)
515

    
516
  # iallocator -----------------
517

    
518
  @staticmethod
519
  def perspective_iallocator_runner(params):
520
    """Run an iallocator script.
521

    
522
    """
523
    name, idata = params
524
    iar = backend.IAllocatorRunner()
525
    return iar.Run(name, idata)
526

    
527
  # test -----------------------
528

    
529
  @staticmethod
530
  def perspective_test_delay(params):
531
    """Run test delay.
532

    
533
    """
534
    duration = params[0]
535
    return utils.TestDelay(duration)
536

    
537
  # file storage ---------------
538

    
539
  @staticmethod
540
  def perspective_file_storage_dir_create(params):
541
    """Create the file storage directory.
542

    
543
    """
544
    file_storage_dir = params[0]
545
    return backend.CreateFileStorageDir(file_storage_dir)
546

    
547
  @staticmethod
548
  def perspective_file_storage_dir_remove(params):
549
    """Remove the file storage directory.
550

    
551
    """
552
    file_storage_dir = params[0]
553
    return backend.RemoveFileStorageDir(file_storage_dir)
554

    
555
  @staticmethod
556
  def perspective_file_storage_dir_rename(params):
557
    """Rename the file storage directory.
558

    
559
    """
560
    old_file_storage_dir = params[0]
561
    new_file_storage_dir = params[1]
562
    return backend.RenameFileStorageDir(old_file_storage_dir,
563
                                        new_file_storage_dir)
564

    
565
  # jobs ------------------------
566

    
567
  @staticmethod
568
  @_RequireJobQueueLock
569
  def perspective_jobqueue_update(params):
570
    """Update job queue.
571

    
572
    """
573
    (file_name, content) = params
574
    return backend.JobQueueUpdate(file_name, content)
575

    
576
  @staticmethod
577
  @_RequireJobQueueLock
578
  def perspective_jobqueue_purge(params):
579
    """Purge job queue.
580

    
581
    """
582
    return backend.JobQueuePurge()
583

    
584
  @staticmethod
585
  @_RequireJobQueueLock
586
  def perspective_jobqueue_rename(params):
587
    """Rename a job queue file.
588

    
589
    """
590
    (old, new) = params
591

    
592
    return backend.JobQueueRename(old, new)
593

    
594

    
595
class NodeDaemonHttpServer(http.HTTPServer):
596
  def __init__(self, server_address):
597
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
598
    self.noded_pid = os.getpid()
599

    
600
  def serve_forever(self):
601
    """Handle requests until told to quit."""
602
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
603
    try:
604
      while not sighandler.called:
605
        self.handle_request()
606
      # TODO: There could be children running at this point
607
    finally:
608
      sighandler.Reset()
609

    
610

    
611
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
612
  """Forking HTTP Server.
613

    
614
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
615
  request we handle. This allows more requests to be handled concurrently.
616

    
617
  """
618

    
619

    
620
def ParseOptions():
621
  """Parse the command line options.
622

    
623
  Returns:
624
    (options, args) as from OptionParser.parse_args()
625

    
626
  """
627
  parser = OptionParser(description="Ganeti node daemon",
628
                        usage="%prog [-f] [-d]",
629
                        version="%%prog (ganeti) %s" %
630
                        constants.RELEASE_VERSION)
631

    
632
  parser.add_option("-f", "--foreground", dest="fork",
633
                    help="Don't detach from the current terminal",
634
                    default=True, action="store_false")
635
  parser.add_option("-d", "--debug", dest="debug",
636
                    help="Enable some debug messages",
637
                    default=False, action="store_true")
638
  options, args = parser.parse_args()
639
  return options, args
640

    
641

    
642
def main():
643
  """Main function for the node daemon.
644

    
645
  """
646
  global queue_lock
647

    
648
  options, args = ParseOptions()
649
  utils.debug = options.debug
650
  for fname in (constants.SSL_CERT_FILE,):
651
    if not os.path.isfile(fname):
652
      print "config %s not there, will not run." % fname
653
      sys.exit(5)
654

    
655
  try:
656
    port = utils.GetNodeDaemonPort()
657
    pwdata = utils.GetNodeDaemonPassword()
658
  except errors.ConfigurationError, err:
659
    print "Cluster configuration incomplete: '%s'" % str(err)
660
    sys.exit(5)
661

    
662
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
663
  # situation where RUN_DIR is tmpfs
664
  for dir_name in constants.SUB_RUN_DIRS:
665
    if not os.path.exists(dir_name):
666
      try:
667
        os.mkdir(dir_name, 0755)
668
      except EnvironmentError, err:
669
        if err.errno != errno.EEXIST:
670
          print ("Node setup wrong, cannot create directory %s: %s" %
671
                 (dir_name, err))
672
          sys.exit(5)
673
    if not os.path.isdir(dir_name):
674
      print ("Node setup wrong, %s is not a directory" % dir_name)
675
      sys.exit(5)
676

    
677
  # become a daemon
678
  if options.fork:
679
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
680

    
681
  utils.WritePidFile(constants.NODED_PID)
682

    
683
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
684
                      stderr_logging=not options.fork)
685
  logging.info("ganeti node daemon startup")
686

    
687
  # Prepare job queue
688
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
689

    
690
  if options.fork:
691
    server = ForkingHTTPServer(('', port))
692
  else:
693
    server = NodeDaemonHttpServer(('', port))
694

    
695
  try:
696
    server.serve_forever()
697
  finally:
698
    utils.RemovePidFile(constants.NODED_PID)
699

    
700

    
701
if __name__ == '__main__':
702
  main()