Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 82d9caef

History | View | Annotate | Download (18 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45

    
46

    
47
queue_lock = None
48

    
49

    
50
def _RequireJobQueueLock(fn):
51
  """Decorator for job queue manipulating functions.
52

    
53
  """
54
  QUEUE_LOCK_TIMEOUT = 10
55

    
56
  def wrapper(*args, **kwargs):
57
    # Locking in exclusive, blocking mode because there could be several
58
    # children running at the same time. Waiting up to 10 seconds.
59
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
60
    try:
61
      return fn(*args, **kwargs)
62
    finally:
63
      queue_lock.Unlock()
64

    
65
  return wrapper
66

    
67

    
68
class NodeHttpServer(http.HttpServer):
69
  """The server implementation.
70

    
71
  This class holds all methods exposed over the RPC interface.
72

    
73
  """
74
  def __init__(self, *args, **kwargs):
75
    http.HttpServer.__init__(self, *args, **kwargs)
76
    self.noded_pid = os.getpid()
77

    
78
  def HandleRequest(self, req):
79
    """Handle a request.
80

    
81
    """
82
    if req.request_method.upper() != "PUT":
83
      raise http.HTTPBadRequest()
84

    
85
    path = req.request_path
86
    if path.startswith("/"):
87
      path = path[1:]
88

    
89
    method = getattr(self, "perspective_%s" % path, None)
90
    if method is None:
91
      raise http.HTTPNotFound()
92

    
93
    try:
94
      try:
95
        return method(req.request_post_data)
96
      except:
97
        logging.exception("Error in RPC call")
98
        raise
99
    except errors.QuitGanetiException, err:
100
      # Tell parent to quit
101
      os.kill(self.noded_pid, signal.SIGTERM)
102

    
103
  # the new block devices  --------------------------
104

    
105
  @staticmethod
106
  def perspective_blockdev_create(params):
107
    """Create a block device.
108

    
109
    """
110
    bdev_s, size, owner, on_primary, info = params
111
    bdev = objects.Disk.FromDict(bdev_s)
112
    if bdev is None:
113
      raise ValueError("can't unserialize data!")
114
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
115

    
116
  @staticmethod
117
  def perspective_blockdev_remove(params):
118
    """Remove a block device.
119

    
120
    """
121
    bdev_s = params[0]
122
    bdev = objects.Disk.FromDict(bdev_s)
123
    return backend.RemoveBlockDevice(bdev)
124

    
125
  @staticmethod
126
  def perspective_blockdev_rename(params):
127
    """Remove a block device.
128

    
129
    """
130
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
131
    return backend.RenameBlockDevices(devlist)
132

    
133
  @staticmethod
134
  def perspective_blockdev_assemble(params):
135
    """Assemble a block device.
136

    
137
    """
138
    bdev_s, owner, on_primary = params
139
    bdev = objects.Disk.FromDict(bdev_s)
140
    if bdev is None:
141
      raise ValueError("can't unserialize data!")
142
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
143

    
144
  @staticmethod
145
  def perspective_blockdev_shutdown(params):
146
    """Shutdown a block device.
147

    
148
    """
149
    bdev_s = params[0]
150
    bdev = objects.Disk.FromDict(bdev_s)
151
    if bdev is None:
152
      raise ValueError("can't unserialize data!")
153
    return backend.ShutdownBlockDevice(bdev)
154

    
155
  @staticmethod
156
  def perspective_blockdev_addchildren(params):
157
    """Add a child to a mirror device.
158

    
159
    Note: this is only valid for mirror devices. It's the caller's duty
160
    to send a correct disk, otherwise we raise an error.
161

    
162
    """
163
    bdev_s, ndev_s = params
164
    bdev = objects.Disk.FromDict(bdev_s)
165
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
166
    if bdev is None or ndevs.count(None) > 0:
167
      raise ValueError("can't unserialize data!")
168
    return backend.MirrorAddChildren(bdev, ndevs)
169

    
170
  @staticmethod
171
  def perspective_blockdev_removechildren(params):
172
    """Remove a child from a mirror device.
173

    
174
    This is only valid for mirror devices, of course. It's the callers
175
    duty to send a correct disk, otherwise we raise an error.
176

    
177
    """
178
    bdev_s, ndev_s = params
179
    bdev = objects.Disk.FromDict(bdev_s)
180
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
181
    if bdev is None or ndevs.count(None) > 0:
182
      raise ValueError("can't unserialize data!")
183
    return backend.MirrorRemoveChildren(bdev, ndevs)
184

    
185
  @staticmethod
186
  def perspective_blockdev_getmirrorstatus(params):
187
    """Return the mirror status for a list of disks.
188

    
189
    """
190
    disks = [objects.Disk.FromDict(dsk_s)
191
            for dsk_s in params]
192
    return backend.GetMirrorStatus(disks)
193

    
194
  @staticmethod
195
  def perspective_blockdev_find(params):
196
    """Expose the FindBlockDevice functionality for a disk.
197

    
198
    This will try to find but not activate a disk.
199

    
200
    """
201
    disk = objects.Disk.FromDict(params[0])
202
    return backend.FindBlockDevice(disk)
203

    
204
  @staticmethod
205
  def perspective_blockdev_snapshot(params):
206
    """Create a snapshot device.
207

    
208
    Note that this is only valid for LVM disks, if we get passed
209
    something else we raise an exception. The snapshot device can be
210
    remove by calling the generic block device remove call.
211

    
212
    """
213
    cfbd = objects.Disk.FromDict(params[0])
214
    return backend.SnapshotBlockDevice(cfbd)
215

    
216
  @staticmethod
217
  def perspective_blockdev_grow(params):
218
    """Grow a stack of devices.
219

    
220
    """
221
    cfbd = objects.Disk.FromDict(params[0])
222
    amount = params[1]
223
    return backend.GrowBlockDevice(cfbd, amount)
224

    
225
  @staticmethod
226
  def perspective_blockdev_close(params):
227
    """Closes the given block devices.
228

    
229
    """
230
    disks = [objects.Disk.FromDict(cf) for cf in params]
231
    return backend.CloseBlockDevices(disks)
232

    
233
  # export/import  --------------------------
234

    
235
  @staticmethod
236
  def perspective_snapshot_export(params):
237
    """Export a given snapshot.
238

    
239
    """
240
    disk = objects.Disk.FromDict(params[0])
241
    dest_node = params[1]
242
    instance = objects.Instance.FromDict(params[2])
243
    cluster_name = params[3]
244
    return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
245

    
246
  @staticmethod
247
  def perspective_finalize_export(params):
248
    """Expose the finalize export functionality.
249

    
250
    """
251
    instance = objects.Instance.FromDict(params[0])
252
    snap_disks = [objects.Disk.FromDict(str_data)
253
                  for str_data in params[1]]
254
    return backend.FinalizeExport(instance, snap_disks)
255

    
256
  @staticmethod
257
  def perspective_export_info(params):
258
    """Query information about an existing export on this node.
259

    
260
    The given path may not contain an export, in which case we return
261
    None.
262

    
263
    """
264
    path = params[0]
265
    einfo = backend.ExportInfo(path)
266
    if einfo is None:
267
      return einfo
268
    return einfo.Dumps()
269

    
270
  @staticmethod
271
  def perspective_export_list(params):
272
    """List the available exports on this node.
273

    
274
    Note that as opposed to export_info, which may query data about an
275
    export in any path, this only queries the standard Ganeti path
276
    (constants.EXPORT_DIR).
277

    
278
    """
279
    return backend.ListExports()
280

    
281
  @staticmethod
282
  def perspective_export_remove(params):
283
    """Remove an export.
284

    
285
    """
286
    export = params[0]
287
    return backend.RemoveExport(export)
288

    
289
  # volume  --------------------------
290

    
291
  @staticmethod
292
  def perspective_volume_list(params):
293
    """Query the list of logical volumes in a given volume group.
294

    
295
    """
296
    vgname = params[0]
297
    return backend.GetVolumeList(vgname)
298

    
299
  @staticmethod
300
  def perspective_vg_list(params):
301
    """Query the list of volume groups.
302

    
303
    """
304
    return backend.ListVolumeGroups()
305

    
306
  # bridge  --------------------------
307

    
308
  @staticmethod
309
  def perspective_bridges_exist(params):
310
    """Check if all bridges given exist on this node.
311

    
312
    """
313
    bridges_list = params[0]
314
    return backend.BridgesExist(bridges_list)
315

    
316
  # instance  --------------------------
317

    
318
  @staticmethod
319
  def perspective_instance_os_add(params):
320
    """Install an OS on a given instance.
321

    
322
    """
323
    inst_s = params[0]
324
    inst = objects.Instance.FromDict(inst_s)
325
    return backend.AddOSToInstance(inst)
326

    
327
  @staticmethod
328
  def perspective_instance_run_rename(params):
329
    """Runs the OS rename script for an instance.
330

    
331
    """
332
    inst_s, old_name = params
333
    inst = objects.Instance.FromDict(inst_s)
334
    return backend.RunRenameInstance(inst, old_name)
335

    
336
  @staticmethod
337
  def perspective_instance_os_import(params):
338
    """Run the import function of an OS onto a given instance.
339

    
340
    """
341
    inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
342
    inst = objects.Instance.FromDict(inst_s)
343
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
344
                                        src_node, src_image, cluster_name)
345

    
346
  @staticmethod
347
  def perspective_instance_shutdown(params):
348
    """Shutdown an instance.
349

    
350
    """
351
    instance = objects.Instance.FromDict(params[0])
352
    return backend.ShutdownInstance(instance)
353

    
354
  @staticmethod
355
  def perspective_instance_start(params):
356
    """Start an instance.
357

    
358
    """
359
    instance = objects.Instance.FromDict(params[0])
360
    extra_args = params[1]
361
    return backend.StartInstance(instance, extra_args)
362

    
363
  @staticmethod
364
  def perspective_instance_migrate(params):
365
    """Migrates an instance.
366

    
367
    """
368
    instance, target, live = params
369
    instance = objects.Instance.FromDict(instance)
370
    return backend.MigrateInstance(instance, target, live)
371

    
372
  @staticmethod
373
  def perspective_instance_reboot(params):
374
    """Reboot an instance.
375

    
376
    """
377
    instance = objects.Instance.FromDict(params[0])
378
    reboot_type = params[1]
379
    extra_args = params[2]
380
    return backend.RebootInstance(instance, reboot_type, extra_args)
381

    
382
  @staticmethod
383
  def perspective_instance_info(params):
384
    """Query instance information.
385

    
386
    """
387
    return backend.GetInstanceInfo(params[0], params[1])
388

    
389
  @staticmethod
390
  def perspective_all_instances_info(params):
391
    """Query information about all instances.
392

    
393
    """
394
    return backend.GetAllInstancesInfo(params[0])
395

    
396
  @staticmethod
397
  def perspective_instance_list(params):
398
    """Query the list of running instances.
399

    
400
    """
401
    return backend.GetInstanceList(params[0])
402

    
403
  # node --------------------------
404

    
405
  @staticmethod
406
  def perspective_node_tcp_ping(params):
407
    """Do a TcpPing on the remote node.
408

    
409
    """
410
    return utils.TcpPing(params[1], params[2], timeout=params[3],
411
                         live_port_needed=params[4], source=params[0])
412

    
413
  @staticmethod
414
  def perspective_node_has_ip_address(params):
415
    """Checks if a node has the given ip address.
416

    
417
    """
418
    return utils.OwnIpAddress(params[0])
419

    
420
  @staticmethod
421
  def perspective_node_info(params):
422
    """Query node information.
423

    
424
    """
425
    vgname, hypervisor_type = params
426
    return backend.GetNodeInfo(vgname, hypervisor_type)
427

    
428
  @staticmethod
429
  def perspective_node_add(params):
430
    """Complete the registration of this node in the cluster.
431

    
432
    """
433
    return backend.AddNode(params[0], params[1], params[2],
434
                           params[3], params[4], params[5])
435

    
436
  @staticmethod
437
  def perspective_node_verify(params):
438
    """Run a verify sequence on this node.
439

    
440
    """
441
    return backend.VerifyNode(params[0], params[1])
442

    
443
  @staticmethod
444
  def perspective_node_start_master(params):
445
    """Promote this node to master status.
446

    
447
    """
448
    return backend.StartMaster(params[0])
449

    
450
  @staticmethod
451
  def perspective_node_stop_master(params):
452
    """Demote this node from master status.
453

    
454
    """
455
    return backend.StopMaster(params[0])
456

    
457
  @staticmethod
458
  def perspective_node_leave_cluster(params):
459
    """Cleanup after leaving a cluster.
460

    
461
    """
462
    return backend.LeaveCluster()
463

    
464
  @staticmethod
465
  def perspective_node_volumes(params):
466
    """Query the list of all logical volume groups.
467

    
468
    """
469
    return backend.NodeVolumes()
470

    
471
  # cluster --------------------------
472

    
473
  @staticmethod
474
  def perspective_version(params):
475
    """Query version information.
476

    
477
    """
478
    return constants.PROTOCOL_VERSION
479

    
480
  @staticmethod
481
  def perspective_upload_file(params):
482
    """Upload a file.
483

    
484
    Note that the backend implementation imposes strict rules on which
485
    files are accepted.
486

    
487
    """
488
    return backend.UploadFile(*params)
489

    
490
  @staticmethod
491
  def perspective_master_info(params):
492
    """Query master information.
493

    
494
    """
495
    return backend.GetMasterInfo()
496

    
497
  # os -----------------------
498

    
499
  @staticmethod
500
  def perspective_os_diagnose(params):
501
    """Query detailed information about existing OSes.
502

    
503
    """
504
    return [os.ToDict() for os in backend.DiagnoseOS()]
505

    
506
  @staticmethod
507
  def perspective_os_get(params):
508
    """Query information about a given OS.
509

    
510
    """
511
    name = params[0]
512
    try:
513
      os_obj = backend.OSFromDisk(name)
514
    except errors.InvalidOS, err:
515
      os_obj = objects.OS.FromInvalidOS(err)
516
    return os_obj.ToDict()
517

    
518
  # hooks -----------------------
519

    
520
  @staticmethod
521
  def perspective_hooks_runner(params):
522
    """Run hook scripts.
523

    
524
    """
525
    hpath, phase, env = params
526
    hr = backend.HooksRunner()
527
    return hr.RunHooks(hpath, phase, env)
528

    
529
  # iallocator -----------------
530

    
531
  @staticmethod
532
  def perspective_iallocator_runner(params):
533
    """Run an iallocator script.
534

    
535
    """
536
    name, idata = params
537
    iar = backend.IAllocatorRunner()
538
    return iar.Run(name, idata)
539

    
540
  # test -----------------------
541

    
542
  @staticmethod
543
  def perspective_test_delay(params):
544
    """Run test delay.
545

    
546
    """
547
    duration = params[0]
548
    return utils.TestDelay(duration)
549

    
550
  # file storage ---------------
551

    
552
  @staticmethod
553
  def perspective_file_storage_dir_create(params):
554
    """Create the file storage directory.
555

    
556
    """
557
    file_storage_dir = params[0]
558
    return backend.CreateFileStorageDir(file_storage_dir)
559

    
560
  @staticmethod
561
  def perspective_file_storage_dir_remove(params):
562
    """Remove the file storage directory.
563

    
564
    """
565
    file_storage_dir = params[0]
566
    return backend.RemoveFileStorageDir(file_storage_dir)
567

    
568
  @staticmethod
569
  def perspective_file_storage_dir_rename(params):
570
    """Rename the file storage directory.
571

    
572
    """
573
    old_file_storage_dir = params[0]
574
    new_file_storage_dir = params[1]
575
    return backend.RenameFileStorageDir(old_file_storage_dir,
576
                                        new_file_storage_dir)
577

    
578
  # jobs ------------------------
579

    
580
  @staticmethod
581
  @_RequireJobQueueLock
582
  def perspective_jobqueue_update(params):
583
    """Update job queue.
584

    
585
    """
586
    (file_name, content) = params
587
    return backend.JobQueueUpdate(file_name, content)
588

    
589
  @staticmethod
590
  @_RequireJobQueueLock
591
  def perspective_jobqueue_purge(params):
592
    """Purge job queue.
593

    
594
    """
595
    return backend.JobQueuePurge()
596

    
597
  @staticmethod
598
  @_RequireJobQueueLock
599
  def perspective_jobqueue_rename(params):
600
    """Rename a job queue file.
601

    
602
    """
603
    (old, new) = params
604

    
605
    return backend.JobQueueRename(old, new)
606

    
607
  @staticmethod
608
  def perspective_jobqueue_set_drain(params):
609
    """Set/unset the queue drain flag.
610

    
611
    """
612
    drain_flag = params[0]
613
    return backend.JobQueueSetDrainFlag(drain_flag)
614

    
615

    
616
  # hypervisor ---------------
617

    
618
  @staticmethod
619
  def perspective_hypervisor_validate_params(params):
620
    """Validate the hypervisor parameters.
621

    
622
    """
623
    (hvname, hvparams) = params
624
    return backend.ValidateHVParams(hvname, hvparams)
625

    
626

    
627
def ParseOptions():
628
  """Parse the command line options.
629

    
630
  Returns:
631
    (options, args) as from OptionParser.parse_args()
632

    
633
  """
634
  parser = OptionParser(description="Ganeti node daemon",
635
                        usage="%prog [-f] [-d]",
636
                        version="%%prog (ganeti) %s" %
637
                        constants.RELEASE_VERSION)
638

    
639
  parser.add_option("-f", "--foreground", dest="fork",
640
                    help="Don't detach from the current terminal",
641
                    default=True, action="store_false")
642
  parser.add_option("-d", "--debug", dest="debug",
643
                    help="Enable some debug messages",
644
                    default=False, action="store_true")
645
  options, args = parser.parse_args()
646
  return options, args
647

    
648

    
649
def main():
650
  """Main function for the node daemon.
651

    
652
  """
653
  global queue_lock
654

    
655
  options, args = ParseOptions()
656
  utils.debug = options.debug
657
  for fname in (constants.SSL_CERT_FILE,):
658
    if not os.path.isfile(fname):
659
      print "config %s not there, will not run." % fname
660
      sys.exit(5)
661

    
662
  try:
663
    port = utils.GetNodeDaemonPort()
664
    pwdata = utils.GetNodeDaemonPassword()
665
  except errors.ConfigurationError, err:
666
    print "Cluster configuration incomplete: '%s'" % str(err)
667
    sys.exit(5)
668

    
669
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
670
  # situation where RUN_DIR is tmpfs
671
  for dir_name in constants.SUB_RUN_DIRS:
672
    if not os.path.exists(dir_name):
673
      try:
674
        os.mkdir(dir_name, 0755)
675
      except EnvironmentError, err:
676
        if err.errno != errno.EEXIST:
677
          print ("Node setup wrong, cannot create directory %s: %s" %
678
                 (dir_name, err))
679
          sys.exit(5)
680
    if not os.path.isdir(dir_name):
681
      print ("Node setup wrong, %s is not a directory" % dir_name)
682
      sys.exit(5)
683

    
684
  # become a daemon
685
  if options.fork:
686
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
687

    
688
  utils.WritePidFile(constants.NODED_PID)
689
  try:
690
    utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
691
                       stderr_logging=not options.fork)
692
    logging.info("ganeti node daemon startup")
693

    
694
    # Prepare job queue
695
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
696

    
697
    mainloop = daemon.Mainloop()
698
    server = NodeHttpServer(mainloop, ("", port))
699
    server.Start()
700
    try:
701
      mainloop.Run()
702
    finally:
703
      server.Stop()
704
  finally:
705
    utils.RemovePidFile(constants.NODED_PID)
706

    
707

    
708
if __name__ == '__main__':
709
  main()