Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 8785cb30

History | View | Annotate | Download (17.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import ssconf
44
from ganeti import http
45
from ganeti import utils
46

    
47

    
48
queue_lock = None
49

    
50

    
51
def _RequireJobQueueLock(fn):
52
  """Decorator for job queue manipulating functions.
53

    
54
  """
55
  QUEUE_LOCK_TIMEOUT = 10
56

    
57
  def wrapper(*args, **kwargs):
58
    # Locking in exclusive, blocking mode because there could be several
59
    # children running at the same time. Waiting up to 10 seconds.
60
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61
    try:
62
      return fn(*args, **kwargs)
63
    finally:
64
      queue_lock.Unlock()
65

    
66
  return wrapper
67

    
68

    
69
class NodeDaemonRequestHandler(http.HTTPRequestHandler):
70
  """The server implementation.
71

    
72
  This class holds all methods exposed over the RPC interface.
73

    
74
  """
75
  def HandleRequest(self):
76
    """Handle a request.
77

    
78
    """
79
    if self.command.upper() != "PUT":
80
      raise http.HTTPBadRequest()
81

    
82
    path = self.path
83
    if path.startswith("/"):
84
      path = path[1:]
85

    
86
    method = getattr(self, "perspective_%s" % path, None)
87
    if method is None:
88
      raise httperror.HTTPNotFound()
89

    
90
    try:
91
      try:
92
        return method(self.post_data)
93
      except:
94
        logging.exception("Error in RPC call")
95
        raise
96
    except errors.QuitGanetiException, err:
97
      # Tell parent to quit
98
      os.kill(self.server.noded_pid, signal.SIGTERM)
99

    
100
  # the new block devices  --------------------------
101

    
102
  @staticmethod
103
  def perspective_blockdev_create(params):
104
    """Create a block device.
105

    
106
    """
107
    bdev_s, size, owner, on_primary, info = params
108
    bdev = objects.Disk.FromDict(bdev_s)
109
    if bdev is None:
110
      raise ValueError("can't unserialize data!")
111
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
112

    
113
  @staticmethod
114
  def perspective_blockdev_remove(params):
115
    """Remove a block device.
116

    
117
    """
118
    bdev_s = params[0]
119
    bdev = objects.Disk.FromDict(bdev_s)
120
    return backend.RemoveBlockDevice(bdev)
121

    
122
  @staticmethod
123
  def perspective_blockdev_rename(params):
124
    """Remove a block device.
125

    
126
    """
127
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
128
    return backend.RenameBlockDevices(devlist)
129

    
130
  @staticmethod
131
  def perspective_blockdev_assemble(params):
132
    """Assemble a block device.
133

    
134
    """
135
    bdev_s, owner, on_primary = params
136
    bdev = objects.Disk.FromDict(bdev_s)
137
    if bdev is None:
138
      raise ValueError("can't unserialize data!")
139
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
140

    
141
  @staticmethod
142
  def perspective_blockdev_shutdown(params):
143
    """Shutdown a block device.
144

    
145
    """
146
    bdev_s = params[0]
147
    bdev = objects.Disk.FromDict(bdev_s)
148
    if bdev is None:
149
      raise ValueError("can't unserialize data!")
150
    return backend.ShutdownBlockDevice(bdev)
151

    
152
  @staticmethod
153
  def perspective_blockdev_addchildren(params):
154
    """Add a child to a mirror device.
155

    
156
    Note: this is only valid for mirror devices. It's the caller's duty
157
    to send a correct disk, otherwise we raise an error.
158

    
159
    """
160
    bdev_s, ndev_s = params
161
    bdev = objects.Disk.FromDict(bdev_s)
162
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
163
    if bdev is None or ndevs.count(None) > 0:
164
      raise ValueError("can't unserialize data!")
165
    return backend.MirrorAddChildren(bdev, ndevs)
166

    
167
  @staticmethod
168
  def perspective_blockdev_removechildren(params):
169
    """Remove a child from a mirror device.
170

    
171
    This is only valid for mirror devices, of course. It's the callers
172
    duty to send a correct disk, otherwise we raise an error.
173

    
174
    """
175
    bdev_s, ndev_s = params
176
    bdev = objects.Disk.FromDict(bdev_s)
177
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
178
    if bdev is None or ndevs.count(None) > 0:
179
      raise ValueError("can't unserialize data!")
180
    return backend.MirrorRemoveChildren(bdev, ndevs)
181

    
182
  @staticmethod
183
  def perspective_blockdev_getmirrorstatus(params):
184
    """Return the mirror status for a list of disks.
185

    
186
    """
187
    disks = [objects.Disk.FromDict(dsk_s)
188
            for dsk_s in params]
189
    return backend.GetMirrorStatus(disks)
190

    
191
  @staticmethod
192
  def perspective_blockdev_find(params):
193
    """Expose the FindBlockDevice functionality for a disk.
194

    
195
    This will try to find but not activate a disk.
196

    
197
    """
198
    disk = objects.Disk.FromDict(params[0])
199
    return backend.FindBlockDevice(disk)
200

    
201
  @staticmethod
202
  def perspective_blockdev_snapshot(params):
203
    """Create a snapshot device.
204

    
205
    Note that this is only valid for LVM disks, if we get passed
206
    something else we raise an exception. The snapshot device can be
207
    remove by calling the generic block device remove call.
208

    
209
    """
210
    cfbd = objects.Disk.FromDict(params[0])
211
    return backend.SnapshotBlockDevice(cfbd)
212

    
213
  @staticmethod
214
  def perspective_blockdev_grow(params):
215
    """Grow a stack of devices.
216

    
217
    """
218
    cfbd = objects.Disk.FromDict(params[0])
219
    amount = params[1]
220
    return backend.GrowBlockDevice(cfbd, amount)
221

    
222
  @staticmethod
223
  def perspective_blockdev_close(params):
224
    """Closes the given block devices.
225

    
226
    """
227
    disks = [objects.Disk.FromDict(cf) for cf in params]
228
    return backend.CloseBlockDevices(disks)
229

    
230
  # export/import  --------------------------
231

    
232
  @staticmethod
233
  def perspective_snapshot_export(params):
234
    """Export a given snapshot.
235

    
236
    """
237
    disk = objects.Disk.FromDict(params[0])
238
    dest_node = params[1]
239
    instance = objects.Instance.FromDict(params[2])
240
    return backend.ExportSnapshot(disk, dest_node, instance)
241

    
242
  @staticmethod
243
  def perspective_finalize_export(params):
244
    """Expose the finalize export functionality.
245

    
246
    """
247
    instance = objects.Instance.FromDict(params[0])
248
    snap_disks = [objects.Disk.FromDict(str_data)
249
                  for str_data in params[1]]
250
    return backend.FinalizeExport(instance, snap_disks)
251

    
252
  @staticmethod
253
  def perspective_export_info(params):
254
    """Query information about an existing export on this node.
255

    
256
    The given path may not contain an export, in which case we return
257
    None.
258

    
259
    """
260
    path = params[0]
261
    einfo = backend.ExportInfo(path)
262
    if einfo is None:
263
      return einfo
264
    return einfo.Dumps()
265

    
266
  @staticmethod
267
  def perspective_export_list(params):
268
    """List the available exports on this node.
269

    
270
    Note that as opposed to export_info, which may query data about an
271
    export in any path, this only queries the standard Ganeti path
272
    (constants.EXPORT_DIR).
273

    
274
    """
275
    return backend.ListExports()
276

    
277
  @staticmethod
278
  def perspective_export_remove(params):
279
    """Remove an export.
280

    
281
    """
282
    export = params[0]
283
    return backend.RemoveExport(export)
284

    
285
  # volume  --------------------------
286

    
287
  @staticmethod
288
  def perspective_volume_list(params):
289
    """Query the list of logical volumes in a given volume group.
290

    
291
    """
292
    vgname = params[0]
293
    return backend.GetVolumeList(vgname)
294

    
295
  @staticmethod
296
  def perspective_vg_list(params):
297
    """Query the list of volume groups.
298

    
299
    """
300
    return backend.ListVolumeGroups()
301

    
302
  # bridge  --------------------------
303

    
304
  @staticmethod
305
  def perspective_bridges_exist(params):
306
    """Check if all bridges given exist on this node.
307

    
308
    """
309
    bridges_list = params[0]
310
    return backend.BridgesExist(bridges_list)
311

    
312
  # instance  --------------------------
313

    
314
  @staticmethod
315
  def perspective_instance_os_add(params):
316
    """Install an OS on a given instance.
317

    
318
    """
319
    inst_s, os_disk, swap_disk = params
320
    inst = objects.Instance.FromDict(inst_s)
321
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
322

    
323
  @staticmethod
324
  def perspective_instance_run_rename(params):
325
    """Runs the OS rename script for an instance.
326

    
327
    """
328
    inst_s, old_name, os_disk, swap_disk = params
329
    inst = objects.Instance.FromDict(inst_s)
330
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
331

    
332
  @staticmethod
333
  def perspective_instance_os_import(params):
334
    """Run the import function of an OS onto a given instance.
335

    
336
    """
337
    inst_s, os_disk, swap_disk, src_node, src_image = params
338
    inst = objects.Instance.FromDict(inst_s)
339
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
340
                                        src_node, src_image)
341

    
342
  @staticmethod
343
  def perspective_instance_shutdown(params):
344
    """Shutdown an instance.
345

    
346
    """
347
    instance = objects.Instance.FromDict(params[0])
348
    return backend.ShutdownInstance(instance)
349

    
350
  @staticmethod
351
  def perspective_instance_start(params):
352
    """Start an instance.
353

    
354
    """
355
    instance = objects.Instance.FromDict(params[0])
356
    extra_args = params[1]
357
    return backend.StartInstance(instance, extra_args)
358

    
359
  @staticmethod
360
  def perspective_instance_migrate(params):
361
    """Migrates an instance.
362

    
363
    """
364
    instance, target, live = params
365
    return backend.MigrateInstance(instance, target, live)
366

    
367
  @staticmethod
368
  def perspective_instance_reboot(params):
369
    """Reboot an instance.
370

    
371
    """
372
    instance = objects.Instance.FromDict(params[0])
373
    reboot_type = params[1]
374
    extra_args = params[2]
375
    return backend.RebootInstance(instance, reboot_type, extra_args)
376

    
377
  @staticmethod
378
  def perspective_instance_info(params):
379
    """Query instance information.
380

    
381
    """
382
    return backend.GetInstanceInfo(params[0])
383

    
384
  @staticmethod
385
  def perspective_all_instances_info(params):
386
    """Query information about all instances.
387

    
388
    """
389
    return backend.GetAllInstancesInfo()
390

    
391
  @staticmethod
392
  def perspective_instance_list(params):
393
    """Query the list of running instances.
394

    
395
    """
396
    return backend.GetInstanceList()
397

    
398
  # node --------------------------
399

    
400
  @staticmethod
401
  def perspective_node_tcp_ping(params):
402
    """Do a TcpPing on the remote node.
403

    
404
    """
405
    return utils.TcpPing(params[1], params[2], timeout=params[3],
406
                         live_port_needed=params[4], source=params[0])
407

    
408
  @staticmethod
409
  def perspective_node_info(params):
410
    """Query node information.
411

    
412
    """
413
    vgname = params[0]
414
    return backend.GetNodeInfo(vgname)
415

    
416
  @staticmethod
417
  def perspective_node_add(params):
418
    """Complete the registration of this node in the cluster.
419

    
420
    """
421
    return backend.AddNode(params[0], params[1], params[2],
422
                           params[3], params[4], params[5])
423

    
424
  @staticmethod
425
  def perspective_node_verify(params):
426
    """Run a verify sequence on this node.
427

    
428
    """
429
    return backend.VerifyNode(params[0])
430

    
431
  @staticmethod
432
  def perspective_node_start_master(params):
433
    """Promote this node to master status.
434

    
435
    """
436
    return backend.StartMaster(params[0])
437

    
438
  @staticmethod
439
  def perspective_node_stop_master(params):
440
    """Demote this node from master status.
441

    
442
    """
443
    return backend.StopMaster(params[0])
444

    
445
  @staticmethod
446
  def perspective_node_leave_cluster(params):
447
    """Cleanup after leaving a cluster.
448

    
449
    """
450
    return backend.LeaveCluster()
451

    
452
  @staticmethod
453
  def perspective_node_volumes(params):
454
    """Query the list of all logical volume groups.
455

    
456
    """
457
    return backend.NodeVolumes()
458

    
459
  # cluster --------------------------
460

    
461
  @staticmethod
462
  def perspective_version(params):
463
    """Query version information.
464

    
465
    """
466
    return constants.PROTOCOL_VERSION
467

    
468
  @staticmethod
469
  def perspective_upload_file(params):
470
    """Upload a file.
471

    
472
    Note that the backend implementation imposes strict rules on which
473
    files are accepted.
474

    
475
    """
476
    return backend.UploadFile(*params)
477

    
478
  @staticmethod
479
  def perspective_master_info(params):
480
    """Query master information.
481

    
482
    """
483
    return backend.GetMasterInfo()
484

    
485
  # os -----------------------
486

    
487
  @staticmethod
488
  def perspective_os_diagnose(params):
489
    """Query detailed information about existing OSes.
490

    
491
    """
492
    return [os.ToDict() for os in backend.DiagnoseOS()]
493

    
494
  @staticmethod
495
  def perspective_os_get(params):
496
    """Query information about a given OS.
497

    
498
    """
499
    name = params[0]
500
    try:
501
      os_obj = backend.OSFromDisk(name)
502
    except errors.InvalidOS, err:
503
      os_obj = objects.OS.FromInvalidOS(err)
504
    return os_obj.ToDict()
505

    
506
  # hooks -----------------------
507

    
508
  @staticmethod
509
  def perspective_hooks_runner(params):
510
    """Run hook scripts.
511

    
512
    """
513
    hpath, phase, env = params
514
    hr = backend.HooksRunner()
515
    return hr.RunHooks(hpath, phase, env)
516

    
517
  # iallocator -----------------
518

    
519
  @staticmethod
520
  def perspective_iallocator_runner(params):
521
    """Run an iallocator script.
522

    
523
    """
524
    name, idata = params
525
    iar = backend.IAllocatorRunner()
526
    return iar.Run(name, idata)
527

    
528
  # test -----------------------
529

    
530
  @staticmethod
531
  def perspective_test_delay(params):
532
    """Run test delay.
533

    
534
    """
535
    duration = params[0]
536
    return utils.TestDelay(duration)
537

    
538
  # file storage ---------------
539

    
540
  @staticmethod
541
  def perspective_file_storage_dir_create(params):
542
    """Create the file storage directory.
543

    
544
    """
545
    file_storage_dir = params[0]
546
    return backend.CreateFileStorageDir(file_storage_dir)
547

    
548
  @staticmethod
549
  def perspective_file_storage_dir_remove(params):
550
    """Remove the file storage directory.
551

    
552
    """
553
    file_storage_dir = params[0]
554
    return backend.RemoveFileStorageDir(file_storage_dir)
555

    
556
  @staticmethod
557
  def perspective_file_storage_dir_rename(params):
558
    """Rename the file storage directory.
559

    
560
    """
561
    old_file_storage_dir = params[0]
562
    new_file_storage_dir = params[1]
563
    return backend.RenameFileStorageDir(old_file_storage_dir,
564
                                        new_file_storage_dir)
565

    
566
  # jobs ------------------------
567

    
568
  @staticmethod
569
  @_RequireJobQueueLock
570
  def perspective_jobqueue_update(params):
571
    """Update job queue.
572

    
573
    """
574
    (file_name, content) = params
575
    return backend.JobQueueUpdate(file_name, content)
576

    
577
  @staticmethod
578
  @_RequireJobQueueLock
579
  def perspective_jobqueue_purge(params):
580
    """Purge job queue.
581

    
582
    """
583
    return backend.JobQueuePurge()
584

    
585
  @staticmethod
586
  @_RequireJobQueueLock
587
  def perspective_jobqueue_rename(params):
588
    """Rename a job queue file.
589

    
590
    """
591
    (old, new) = params
592

    
593
    return backend.JobQueueRename(old, new)
594

    
595

    
596
class NodeDaemonHttpServer(http.HTTPServer):
597
  def __init__(self, server_address):
598
    http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
599
    self.noded_pid = os.getpid()
600

    
601
  def serve_forever(self):
602
    """Handle requests until told to quit."""
603
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
604
    try:
605
      while not sighandler.called:
606
        self.handle_request()
607
      # TODO: There could be children running at this point
608
    finally:
609
      sighandler.Reset()
610

    
611

    
612
class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
613
  """Forking HTTP Server.
614

    
615
  This inherits from ForkingMixIn and HTTPServer in order to fork for each
616
  request we handle. This allows more requests to be handled concurrently.
617

    
618
  """
619

    
620

    
621
def ParseOptions():
622
  """Parse the command line options.
623

    
624
  Returns:
625
    (options, args) as from OptionParser.parse_args()
626

    
627
  """
628
  parser = OptionParser(description="Ganeti node daemon",
629
                        usage="%prog [-f] [-d]",
630
                        version="%%prog (ganeti) %s" %
631
                        constants.RELEASE_VERSION)
632

    
633
  parser.add_option("-f", "--foreground", dest="fork",
634
                    help="Don't detach from the current terminal",
635
                    default=True, action="store_false")
636
  parser.add_option("-d", "--debug", dest="debug",
637
                    help="Enable some debug messages",
638
                    default=False, action="store_true")
639
  options, args = parser.parse_args()
640
  return options, args
641

    
642

    
643
def main():
644
  """Main function for the node daemon.
645

    
646
  """
647
  global queue_lock
648

    
649
  options, args = ParseOptions()
650
  utils.debug = options.debug
651
  for fname in (constants.SSL_CERT_FILE,):
652
    if not os.path.isfile(fname):
653
      print "config %s not there, will not run." % fname
654
      sys.exit(5)
655

    
656
  try:
657
    ss = ssconf.SimpleStore()
658
    port = ss.GetNodeDaemonPort()
659
    pwdata = ss.GetNodeDaemonPassword()
660
  except errors.ConfigurationError, err:
661
    print "Cluster configuration incomplete: '%s'" % str(err)
662
    sys.exit(5)
663

    
664
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
665
  # situation where RUN_DIR is tmpfs
666
  for dir_name in constants.SUB_RUN_DIRS:
667
    if not os.path.exists(dir_name):
668
      try:
669
        os.mkdir(dir_name, 0755)
670
      except EnvironmentError, err:
671
        if err.errno != errno.EEXIST:
672
          print ("Node setup wrong, cannot create directory %s: %s" %
673
                 (dir_name, err))
674
          sys.exit(5)
675
    if not os.path.isdir(dir_name):
676
      print ("Node setup wrong, %s is not a directory" % dir_name)
677
      sys.exit(5)
678

    
679
  # become a daemon
680
  if options.fork:
681
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
682

    
683
  utils.WritePidFile(constants.NODED_PID)
684

    
685
  logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
686
                      stderr_logging=not options.fork)
687
  logging.info("ganeti node daemon startup")
688

    
689
  # Prepare job queue
690
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
691

    
692
  if options.fork:
693
    server = ForkingHTTPServer(('', port))
694
  else:
695
    server = NodeDaemonHttpServer(('', port))
696

    
697
  try:
698
    server.serve_forever()
699
  finally:
700
    utils.RemovePidFile(constants.NODED_PID)
701

    
702

    
703
if __name__ == '__main__':
704
  main()