Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 5d672980

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import jstore
43
from ganeti import daemon
44
from ganeti import http
45
from ganeti import utils
46

    
47

    
48
queue_lock = None
49

    
50

    
51
def _RequireJobQueueLock(fn):
52
  """Decorator for job queue manipulating functions.
53

    
54
  """
55
  QUEUE_LOCK_TIMEOUT = 10
56

    
57
  def wrapper(*args, **kwargs):
58
    # Locking in exclusive, blocking mode because there could be several
59
    # children running at the same time. Waiting up to 10 seconds.
60
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
61
    try:
62
      return fn(*args, **kwargs)
63
    finally:
64
      queue_lock.Unlock()
65

    
66
  return wrapper
67

    
68

    
69
class NodeHttpServer(http.HttpServer):
70
  """The server implementation.
71

    
72
  This class holds all methods exposed over the RPC interface.
73

    
74
  """
75
  def __init__(self, *args, **kwargs):
76
    http.HttpServer.__init__(self, *args, **kwargs)
77
    self.noded_pid = os.getpid()
78

    
79
  def HandleRequest(self, req):
80
    """Handle a request.
81

    
82
    """
83
    if req.request_method.upper() != "PUT":
84
      raise http.HTTPBadRequest()
85

    
86
    path = req.request_path
87
    if path.startswith("/"):
88
      path = path[1:]
89

    
90
    method = getattr(self, "perspective_%s" % path, None)
91
    if method is None:
92
      raise http.HTTPNotFound()
93

    
94
    try:
95
      try:
96
        return method(req.request_post_data)
97
      except:
98
        logging.exception("Error in RPC call")
99
        raise
100
    except errors.QuitGanetiException, err:
101
      # Tell parent to quit
102
      os.kill(self.noded_pid, signal.SIGTERM)
103

    
104
  # the new block devices  --------------------------
105

    
106
  @staticmethod
107
  def perspective_blockdev_create(params):
108
    """Create a block device.
109

    
110
    """
111
    bdev_s, size, owner, on_primary, info = params
112
    bdev = objects.Disk.FromDict(bdev_s)
113
    if bdev is None:
114
      raise ValueError("can't unserialize data!")
115
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
116

    
117
  @staticmethod
118
  def perspective_blockdev_remove(params):
119
    """Remove a block device.
120

    
121
    """
122
    bdev_s = params[0]
123
    bdev = objects.Disk.FromDict(bdev_s)
124
    return backend.RemoveBlockDevice(bdev)
125

    
126
  @staticmethod
127
  def perspective_blockdev_rename(params):
128
    """Remove a block device.
129

    
130
    """
131
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
132
    return backend.RenameBlockDevices(devlist)
133

    
134
  @staticmethod
135
  def perspective_blockdev_assemble(params):
136
    """Assemble a block device.
137

    
138
    """
139
    bdev_s, owner, on_primary = params
140
    bdev = objects.Disk.FromDict(bdev_s)
141
    if bdev is None:
142
      raise ValueError("can't unserialize data!")
143
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
144

    
145
  @staticmethod
146
  def perspective_blockdev_shutdown(params):
147
    """Shutdown a block device.
148

    
149
    """
150
    bdev_s = params[0]
151
    bdev = objects.Disk.FromDict(bdev_s)
152
    if bdev is None:
153
      raise ValueError("can't unserialize data!")
154
    return backend.ShutdownBlockDevice(bdev)
155

    
156
  @staticmethod
157
  def perspective_blockdev_addchildren(params):
158
    """Add a child to a mirror device.
159

    
160
    Note: this is only valid for mirror devices. It's the caller's duty
161
    to send a correct disk, otherwise we raise an error.
162

    
163
    """
164
    bdev_s, ndev_s = params
165
    bdev = objects.Disk.FromDict(bdev_s)
166
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
167
    if bdev is None or ndevs.count(None) > 0:
168
      raise ValueError("can't unserialize data!")
169
    return backend.MirrorAddChildren(bdev, ndevs)
170

    
171
  @staticmethod
172
  def perspective_blockdev_removechildren(params):
173
    """Remove a child from a mirror device.
174

    
175
    This is only valid for mirror devices, of course. It's the callers
176
    duty to send a correct disk, otherwise we raise an error.
177

    
178
    """
179
    bdev_s, ndev_s = params
180
    bdev = objects.Disk.FromDict(bdev_s)
181
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
182
    if bdev is None or ndevs.count(None) > 0:
183
      raise ValueError("can't unserialize data!")
184
    return backend.MirrorRemoveChildren(bdev, ndevs)
185

    
186
  @staticmethod
187
  def perspective_blockdev_getmirrorstatus(params):
188
    """Return the mirror status for a list of disks.
189

    
190
    """
191
    disks = [objects.Disk.FromDict(dsk_s)
192
            for dsk_s in params]
193
    return backend.GetMirrorStatus(disks)
194

    
195
  @staticmethod
196
  def perspective_blockdev_find(params):
197
    """Expose the FindBlockDevice functionality for a disk.
198

    
199
    This will try to find but not activate a disk.
200

    
201
    """
202
    disk = objects.Disk.FromDict(params[0])
203
    return backend.FindBlockDevice(disk)
204

    
205
  @staticmethod
206
  def perspective_blockdev_snapshot(params):
207
    """Create a snapshot device.
208

    
209
    Note that this is only valid for LVM disks, if we get passed
210
    something else we raise an exception. The snapshot device can be
211
    remove by calling the generic block device remove call.
212

    
213
    """
214
    cfbd = objects.Disk.FromDict(params[0])
215
    return backend.SnapshotBlockDevice(cfbd)
216

    
217
  @staticmethod
218
  def perspective_blockdev_grow(params):
219
    """Grow a stack of devices.
220

    
221
    """
222
    cfbd = objects.Disk.FromDict(params[0])
223
    amount = params[1]
224
    return backend.GrowBlockDevice(cfbd, amount)
225

    
226
  @staticmethod
227
  def perspective_blockdev_close(params):
228
    """Closes the given block devices.
229

    
230
    """
231
    disks = [objects.Disk.FromDict(cf) for cf in params]
232
    return backend.CloseBlockDevices(disks)
233

    
234
  # export/import  --------------------------
235

    
236
  @staticmethod
237
  def perspective_snapshot_export(params):
238
    """Export a given snapshot.
239

    
240
    """
241
    disk = objects.Disk.FromDict(params[0])
242
    dest_node = params[1]
243
    instance = objects.Instance.FromDict(params[2])
244
    cluster_name = params[3]
245
    return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
246

    
247
  @staticmethod
248
  def perspective_finalize_export(params):
249
    """Expose the finalize export functionality.
250

    
251
    """
252
    instance = objects.Instance.FromDict(params[0])
253
    snap_disks = [objects.Disk.FromDict(str_data)
254
                  for str_data in params[1]]
255
    return backend.FinalizeExport(instance, snap_disks)
256

    
257
  @staticmethod
258
  def perspective_export_info(params):
259
    """Query information about an existing export on this node.
260

    
261
    The given path may not contain an export, in which case we return
262
    None.
263

    
264
    """
265
    path = params[0]
266
    einfo = backend.ExportInfo(path)
267
    if einfo is None:
268
      return einfo
269
    return einfo.Dumps()
270

    
271
  @staticmethod
272
  def perspective_export_list(params):
273
    """List the available exports on this node.
274

    
275
    Note that as opposed to export_info, which may query data about an
276
    export in any path, this only queries the standard Ganeti path
277
    (constants.EXPORT_DIR).
278

    
279
    """
280
    return backend.ListExports()
281

    
282
  @staticmethod
283
  def perspective_export_remove(params):
284
    """Remove an export.
285

    
286
    """
287
    export = params[0]
288
    return backend.RemoveExport(export)
289

    
290
  # volume  --------------------------
291

    
292
  @staticmethod
293
  def perspective_volume_list(params):
294
    """Query the list of logical volumes in a given volume group.
295

    
296
    """
297
    vgname = params[0]
298
    return backend.GetVolumeList(vgname)
299

    
300
  @staticmethod
301
  def perspective_vg_list(params):
302
    """Query the list of volume groups.
303

    
304
    """
305
    return backend.ListVolumeGroups()
306

    
307
  # bridge  --------------------------
308

    
309
  @staticmethod
310
  def perspective_bridges_exist(params):
311
    """Check if all bridges given exist on this node.
312

    
313
    """
314
    bridges_list = params[0]
315
    return backend.BridgesExist(bridges_list)
316

    
317
  # instance  --------------------------
318

    
319
  @staticmethod
320
  def perspective_instance_os_add(params):
321
    """Install an OS on a given instance.
322

    
323
    """
324
    inst_s, os_disk, swap_disk = params
325
    inst = objects.Instance.FromDict(inst_s)
326
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
327

    
328
  @staticmethod
329
  def perspective_instance_run_rename(params):
330
    """Runs the OS rename script for an instance.
331

    
332
    """
333
    inst_s, old_name, os_disk, swap_disk = params
334
    inst = objects.Instance.FromDict(inst_s)
335
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
336

    
337
  @staticmethod
338
  def perspective_instance_os_import(params):
339
    """Run the import function of an OS onto a given instance.
340

    
341
    """
342
    inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
343
    inst = objects.Instance.FromDict(inst_s)
344
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
345
                                        src_node, src_image, cluster_name)
346

    
347
  @staticmethod
348
  def perspective_instance_shutdown(params):
349
    """Shutdown an instance.
350

    
351
    """
352
    instance = objects.Instance.FromDict(params[0])
353
    return backend.ShutdownInstance(instance)
354

    
355
  @staticmethod
356
  def perspective_instance_start(params):
357
    """Start an instance.
358

    
359
    """
360
    instance = objects.Instance.FromDict(params[0])
361
    extra_args = params[1]
362
    return backend.StartInstance(instance, extra_args)
363

    
364
  @staticmethod
365
  def perspective_instance_migrate(params):
366
    """Migrates an instance.
367

    
368
    """
369
    instance, target, live = params
370
    instance = objects.Instance.FromDict(instance)
371
    return backend.MigrateInstance(instance, target, live)
372

    
373
  @staticmethod
374
  def perspective_instance_reboot(params):
375
    """Reboot an instance.
376

    
377
    """
378
    instance = objects.Instance.FromDict(params[0])
379
    reboot_type = params[1]
380
    extra_args = params[2]
381
    return backend.RebootInstance(instance, reboot_type, extra_args)
382

    
383
  @staticmethod
384
  def perspective_instance_info(params):
385
    """Query instance information.
386

    
387
    """
388
    return backend.GetInstanceInfo(params[0], params[1])
389

    
390
  @staticmethod
391
  def perspective_all_instances_info(params):
392
    """Query information about all instances.
393

    
394
    """
395
    return backend.GetAllInstancesInfo(params[0])
396

    
397
  @staticmethod
398
  def perspective_instance_list(params):
399
    """Query the list of running instances.
400

    
401
    """
402
    return backend.GetInstanceList(params[0])
403

    
404
  # node --------------------------
405

    
406
  @staticmethod
407
  def perspective_node_tcp_ping(params):
408
    """Do a TcpPing on the remote node.
409

    
410
    """
411
    return utils.TcpPing(params[1], params[2], timeout=params[3],
412
                         live_port_needed=params[4], source=params[0])
413

    
414
  @staticmethod
415
  def perspective_node_has_ip_address(params):
416
    """Checks if a node has the given ip address.
417

    
418
    """
419
    return utils.OwnIpAddress(params[0])
420

    
421
  @staticmethod
422
  def perspective_node_info(params):
423
    """Query node information.
424

    
425
    """
426
    vgname, hypervisor_type = params
427
    return backend.GetNodeInfo(vgname, hypervisor_type)
428

    
429
  @staticmethod
430
  def perspective_node_add(params):
431
    """Complete the registration of this node in the cluster.
432

    
433
    """
434
    return backend.AddNode(params[0], params[1], params[2],
435
                           params[3], params[4], params[5])
436

    
437
  @staticmethod
438
  def perspective_node_verify(params):
439
    """Run a verify sequence on this node.
440

    
441
    """
442
    return backend.VerifyNode(params[0], params[1])
443

    
444
  @staticmethod
445
  def perspective_node_start_master(params):
446
    """Promote this node to master status.
447

    
448
    """
449
    return backend.StartMaster(params[0])
450

    
451
  @staticmethod
452
  def perspective_node_stop_master(params):
453
    """Demote this node from master status.
454

    
455
    """
456
    return backend.StopMaster(params[0])
457

    
458
  @staticmethod
459
  def perspective_node_leave_cluster(params):
460
    """Cleanup after leaving a cluster.
461

    
462
    """
463
    return backend.LeaveCluster()
464

    
465
  @staticmethod
466
  def perspective_node_volumes(params):
467
    """Query the list of all logical volume groups.
468

    
469
    """
470
    return backend.NodeVolumes()
471

    
472
  # cluster --------------------------
473

    
474
  @staticmethod
475
  def perspective_version(params):
476
    """Query version information.
477

    
478
    """
479
    return constants.PROTOCOL_VERSION
480

    
481
  @staticmethod
482
  def perspective_upload_file(params):
483
    """Upload a file.
484

    
485
    Note that the backend implementation imposes strict rules on which
486
    files are accepted.
487

    
488
    """
489
    return backend.UploadFile(*params)
490

    
491
  @staticmethod
492
  def perspective_master_info(params):
493
    """Query master information.
494

    
495
    """
496
    return backend.GetMasterInfo()
497

    
498
  # os -----------------------
499

    
500
  @staticmethod
501
  def perspective_os_diagnose(params):
502
    """Query detailed information about existing OSes.
503

    
504
    """
505
    return [os.ToDict() for os in backend.DiagnoseOS()]
506

    
507
  @staticmethod
508
  def perspective_os_get(params):
509
    """Query information about a given OS.
510

    
511
    """
512
    name = params[0]
513
    try:
514
      os_obj = backend.OSFromDisk(name)
515
    except errors.InvalidOS, err:
516
      os_obj = objects.OS.FromInvalidOS(err)
517
    return os_obj.ToDict()
518

    
519
  # hooks -----------------------
520

    
521
  @staticmethod
522
  def perspective_hooks_runner(params):
523
    """Run hook scripts.
524

    
525
    """
526
    hpath, phase, env = params
527
    hr = backend.HooksRunner()
528
    return hr.RunHooks(hpath, phase, env)
529

    
530
  # iallocator -----------------
531

    
532
  @staticmethod
533
  def perspective_iallocator_runner(params):
534
    """Run an iallocator script.
535

    
536
    """
537
    name, idata = params
538
    iar = backend.IAllocatorRunner()
539
    return iar.Run(name, idata)
540

    
541
  # test -----------------------
542

    
543
  @staticmethod
544
  def perspective_test_delay(params):
545
    """Run test delay.
546

    
547
    """
548
    duration = params[0]
549
    return utils.TestDelay(duration)
550

    
551
  # file storage ---------------
552

    
553
  @staticmethod
554
  def perspective_file_storage_dir_create(params):
555
    """Create the file storage directory.
556

    
557
    """
558
    file_storage_dir = params[0]
559
    return backend.CreateFileStorageDir(file_storage_dir)
560

    
561
  @staticmethod
562
  def perspective_file_storage_dir_remove(params):
563
    """Remove the file storage directory.
564

    
565
    """
566
    file_storage_dir = params[0]
567
    return backend.RemoveFileStorageDir(file_storage_dir)
568

    
569
  @staticmethod
570
  def perspective_file_storage_dir_rename(params):
571
    """Rename the file storage directory.
572

    
573
    """
574
    old_file_storage_dir = params[0]
575
    new_file_storage_dir = params[1]
576
    return backend.RenameFileStorageDir(old_file_storage_dir,
577
                                        new_file_storage_dir)
578

    
579
  # jobs ------------------------
580

    
581
  @staticmethod
582
  @_RequireJobQueueLock
583
  def perspective_jobqueue_update(params):
584
    """Update job queue.
585

    
586
    """
587
    (file_name, content) = params
588
    return backend.JobQueueUpdate(file_name, content)
589

    
590
  @staticmethod
591
  @_RequireJobQueueLock
592
  def perspective_jobqueue_purge(params):
593
    """Purge job queue.
594

    
595
    """
596
    return backend.JobQueuePurge()
597

    
598
  @staticmethod
599
  @_RequireJobQueueLock
600
  def perspective_jobqueue_rename(params):
601
    """Rename a job queue file.
602

    
603
    """
604
    (old, new) = params
605

    
606
    return backend.JobQueueRename(old, new)
607

    
608
  @staticmethod
609
  def perspective_jobqueue_set_drain(params):
610
    """Set/unset the queue drain flag.
611

    
612
    """
613
    drain_flag = params[0]
614
    return backend.JobQueueSetDrainFlag(drain_flag)
615

    
616

    
617
  # hypervisor ---------------
618

    
619
  @staticmethod
620
  def perspective_hypervisor_validate_params(params):
621
    """Validate the hypervisor parameters.
622

    
623
    """
624
    (hvname, hvparams) = params
625
    return backend.ValidateHVParams(hvname, hvparams)
626

    
627

    
628
def ParseOptions():
629
  """Parse the command line options.
630

    
631
  Returns:
632
    (options, args) as from OptionParser.parse_args()
633

    
634
  """
635
  parser = OptionParser(description="Ganeti node daemon",
636
                        usage="%prog [-f] [-d]",
637
                        version="%%prog (ganeti) %s" %
638
                        constants.RELEASE_VERSION)
639

    
640
  parser.add_option("-f", "--foreground", dest="fork",
641
                    help="Don't detach from the current terminal",
642
                    default=True, action="store_false")
643
  parser.add_option("-d", "--debug", dest="debug",
644
                    help="Enable some debug messages",
645
                    default=False, action="store_true")
646
  options, args = parser.parse_args()
647
  return options, args
648

    
649

    
650
def main():
651
  """Main function for the node daemon.
652

    
653
  """
654
  global queue_lock
655

    
656
  options, args = ParseOptions()
657
  utils.debug = options.debug
658
  for fname in (constants.SSL_CERT_FILE,):
659
    if not os.path.isfile(fname):
660
      print "config %s not there, will not run." % fname
661
      sys.exit(5)
662

    
663
  try:
664
    port = utils.GetNodeDaemonPort()
665
    pwdata = utils.GetNodeDaemonPassword()
666
  except errors.ConfigurationError, err:
667
    print "Cluster configuration incomplete: '%s'" % str(err)
668
    sys.exit(5)
669

    
670
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
671
  # situation where RUN_DIR is tmpfs
672
  for dir_name in constants.SUB_RUN_DIRS:
673
    if not os.path.exists(dir_name):
674
      try:
675
        os.mkdir(dir_name, 0755)
676
      except EnvironmentError, err:
677
        if err.errno != errno.EEXIST:
678
          print ("Node setup wrong, cannot create directory %s: %s" %
679
                 (dir_name, err))
680
          sys.exit(5)
681
    if not os.path.isdir(dir_name):
682
      print ("Node setup wrong, %s is not a directory" % dir_name)
683
      sys.exit(5)
684

    
685
  # become a daemon
686
  if options.fork:
687
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
688

    
689
  utils.WritePidFile(constants.NODED_PID)
690
  try:
691
    logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
692
                        stderr_logging=not options.fork)
693
    logging.info("ganeti node daemon startup")
694

    
695
    # Prepare job queue
696
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
697

    
698
    mainloop = daemon.Mainloop()
699
    server = NodeHttpServer(mainloop, ("", port))
700
    server.Start()
701
    try:
702
      mainloop.Run()
703
    finally:
704
      server.Stop()
705
  finally:
706
    utils.RemovePidFile(constants.NODED_PID)
707

    
708

    
709
if __name__ == '__main__':
710
  main()