Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ 96acbc09

History | View | Annotate | Download (20.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import SocketServer
31
import errno
32
import logging
33
import signal
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import backend
38
from ganeti import constants
39
from ganeti import objects
40
from ganeti import errors
41
from ganeti import jstore
42
from ganeti import daemon
43
from ganeti import http
44
from ganeti import utils
45
from ganeti import storage
46

    
47
import ganeti.http.server
48

    
49

    
50
queue_lock = None
51

    
52

    
53
def _RequireJobQueueLock(fn):
54
  """Decorator for job queue manipulating functions.
55

    
56
  """
57
  QUEUE_LOCK_TIMEOUT = 10
58

    
59
  def wrapper(*args, **kwargs):
60
    # Locking in exclusive, blocking mode because there could be several
61
    # children running at the same time. Waiting up to 10 seconds.
62
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63
    try:
64
      return fn(*args, **kwargs)
65
    finally:
66
      queue_lock.Unlock()
67

    
68
  return wrapper
69

    
70

    
71
class NodeHttpServer(http.server.HttpServer):
72
  """The server implementation.
73

    
74
  This class holds all methods exposed over the RPC interface.
75

    
76
  """
77
  def __init__(self, *args, **kwargs):
78
    http.server.HttpServer.__init__(self, *args, **kwargs)
79
    self.noded_pid = os.getpid()
80

    
81
  def HandleRequest(self, req):
82
    """Handle a request.
83

    
84
    """
85
    if req.request_method.upper() != http.HTTP_PUT:
86
      raise http.HttpBadRequest()
87

    
88
    path = req.request_path
89
    if path.startswith("/"):
90
      path = path[1:]
91

    
92
    method = getattr(self, "perspective_%s" % path, None)
93
    if method is None:
94
      raise http.HttpNotFound()
95

    
96
    try:
97
      rvalue = method(req.request_body)
98
      return True, rvalue
99

    
100
    except backend.RPCFail, err:
101
      # our custom failure exception; str(err) works fine if the
102
      # exception was constructed with a single argument, and in
103
      # this case, err.message == err.args[0] == str(err)
104
      return (False, str(err))
105
    except errors.QuitGanetiException, err:
106
      # Tell parent to quit
107
      logging.info("Shutting down the node daemon, arguments: %s",
108
                   str(err.args))
109
      os.kill(self.noded_pid, signal.SIGTERM)
110
      # And return the error's arguments, which must be already in
111
      # correct tuple format
112
      return err.args
113
    except Exception, err:
114
      logging.exception("Error in RPC call")
115
      return False, "Error while executing backend function: %s" % str(err)
116

    
117
  # the new block devices  --------------------------
118

    
119
  @staticmethod
120
  def perspective_blockdev_create(params):
121
    """Create a block device.
122

    
123
    """
124
    bdev_s, size, owner, on_primary, info = params
125
    bdev = objects.Disk.FromDict(bdev_s)
126
    if bdev is None:
127
      raise ValueError("can't unserialize data!")
128
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
129

    
130
  @staticmethod
131
  def perspective_blockdev_remove(params):
132
    """Remove a block device.
133

    
134
    """
135
    bdev_s = params[0]
136
    bdev = objects.Disk.FromDict(bdev_s)
137
    return backend.BlockdevRemove(bdev)
138

    
139
  @staticmethod
140
  def perspective_blockdev_rename(params):
141
    """Remove a block device.
142

    
143
    """
144
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
145
    return backend.BlockdevRename(devlist)
146

    
147
  @staticmethod
148
  def perspective_blockdev_assemble(params):
149
    """Assemble a block device.
150

    
151
    """
152
    bdev_s, owner, on_primary = params
153
    bdev = objects.Disk.FromDict(bdev_s)
154
    if bdev is None:
155
      raise ValueError("can't unserialize data!")
156
    return backend.BlockdevAssemble(bdev, owner, on_primary)
157

    
158
  @staticmethod
159
  def perspective_blockdev_shutdown(params):
160
    """Shutdown a block device.
161

    
162
    """
163
    bdev_s = params[0]
164
    bdev = objects.Disk.FromDict(bdev_s)
165
    if bdev is None:
166
      raise ValueError("can't unserialize data!")
167
    return backend.BlockdevShutdown(bdev)
168

    
169
  @staticmethod
170
  def perspective_blockdev_addchildren(params):
171
    """Add a child to a mirror device.
172

    
173
    Note: this is only valid for mirror devices. It's the caller's duty
174
    to send a correct disk, otherwise we raise an error.
175

    
176
    """
177
    bdev_s, ndev_s = params
178
    bdev = objects.Disk.FromDict(bdev_s)
179
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180
    if bdev is None or ndevs.count(None) > 0:
181
      raise ValueError("can't unserialize data!")
182
    return backend.BlockdevAddchildren(bdev, ndevs)
183

    
184
  @staticmethod
185
  def perspective_blockdev_removechildren(params):
186
    """Remove a child from a mirror device.
187

    
188
    This is only valid for mirror devices, of course. It's the callers
189
    duty to send a correct disk, otherwise we raise an error.
190

    
191
    """
192
    bdev_s, ndev_s = params
193
    bdev = objects.Disk.FromDict(bdev_s)
194
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
195
    if bdev is None or ndevs.count(None) > 0:
196
      raise ValueError("can't unserialize data!")
197
    return backend.BlockdevRemovechildren(bdev, ndevs)
198

    
199
  @staticmethod
200
  def perspective_blockdev_getmirrorstatus(params):
201
    """Return the mirror status for a list of disks.
202

    
203
    """
204
    disks = [objects.Disk.FromDict(dsk_s)
205
            for dsk_s in params]
206
    return backend.BlockdevGetmirrorstatus(disks)
207

    
208
  @staticmethod
209
  def perspective_blockdev_find(params):
210
    """Expose the FindBlockDevice functionality for a disk.
211

    
212
    This will try to find but not activate a disk.
213

    
214
    """
215
    disk = objects.Disk.FromDict(params[0])
216
    return backend.BlockdevFind(disk).ToDict()
217

    
218
  @staticmethod
219
  def perspective_blockdev_snapshot(params):
220
    """Create a snapshot device.
221

    
222
    Note that this is only valid for LVM disks, if we get passed
223
    something else we raise an exception. The snapshot device can be
224
    remove by calling the generic block device remove call.
225

    
226
    """
227
    cfbd = objects.Disk.FromDict(params[0])
228
    return backend.BlockdevSnapshot(cfbd)
229

    
230
  @staticmethod
231
  def perspective_blockdev_grow(params):
232
    """Grow a stack of devices.
233

    
234
    """
235
    cfbd = objects.Disk.FromDict(params[0])
236
    amount = params[1]
237
    return backend.BlockdevGrow(cfbd, amount)
238

    
239
  @staticmethod
240
  def perspective_blockdev_close(params):
241
    """Closes the given block devices.
242

    
243
    """
244
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
245
    return backend.BlockdevClose(params[0], disks)
246

    
247
  # blockdev/drbd specific methods ----------
248

    
249
  @staticmethod
250
  def perspective_drbd_disconnect_net(params):
251
    """Disconnects the network connection of drbd disks.
252

    
253
    Note that this is only valid for drbd disks, so the members of the
254
    disk list must all be drbd devices.
255

    
256
    """
257
    nodes_ip, disks = params
258
    disks = [objects.Disk.FromDict(cf) for cf in disks]
259
    return backend.DrbdDisconnectNet(nodes_ip, disks)
260

    
261
  @staticmethod
262
  def perspective_drbd_attach_net(params):
263
    """Attaches the network connection of drbd disks.
264

    
265
    Note that this is only valid for drbd disks, so the members of the
266
    disk list must all be drbd devices.
267

    
268
    """
269
    nodes_ip, disks, instance_name, multimaster = params
270
    disks = [objects.Disk.FromDict(cf) for cf in disks]
271
    return backend.DrbdAttachNet(nodes_ip, disks,
272
                                     instance_name, multimaster)
273

    
274
  @staticmethod
275
  def perspective_drbd_wait_sync(params):
276
    """Wait until DRBD disks are synched.
277

    
278
    Note that this is only valid for drbd disks, so the members of the
279
    disk list must all be drbd devices.
280

    
281
    """
282
    nodes_ip, disks = params
283
    disks = [objects.Disk.FromDict(cf) for cf in disks]
284
    return backend.DrbdWaitSync(nodes_ip, disks)
285

    
286
  # export/import  --------------------------
287

    
288
  @staticmethod
289
  def perspective_snapshot_export(params):
290
    """Export a given snapshot.
291

    
292
    """
293
    disk = objects.Disk.FromDict(params[0])
294
    dest_node = params[1]
295
    instance = objects.Instance.FromDict(params[2])
296
    cluster_name = params[3]
297
    dev_idx = params[4]
298
    return backend.ExportSnapshot(disk, dest_node, instance,
299
                                  cluster_name, dev_idx)
300

    
301
  @staticmethod
302
  def perspective_finalize_export(params):
303
    """Expose the finalize export functionality.
304

    
305
    """
306
    instance = objects.Instance.FromDict(params[0])
307
    snap_disks = [objects.Disk.FromDict(str_data)
308
                  for str_data in params[1]]
309
    return backend.FinalizeExport(instance, snap_disks)
310

    
311
  @staticmethod
312
  def perspective_export_info(params):
313
    """Query information about an existing export on this node.
314

    
315
    The given path may not contain an export, in which case we return
316
    None.
317

    
318
    """
319
    path = params[0]
320
    return backend.ExportInfo(path)
321

    
322
  @staticmethod
323
  def perspective_export_list(params):
324
    """List the available exports on this node.
325

    
326
    Note that as opposed to export_info, which may query data about an
327
    export in any path, this only queries the standard Ganeti path
328
    (constants.EXPORT_DIR).
329

    
330
    """
331
    return backend.ListExports()
332

    
333
  @staticmethod
334
  def perspective_export_remove(params):
335
    """Remove an export.
336

    
337
    """
338
    export = params[0]
339
    return backend.RemoveExport(export)
340

    
341
  # volume  --------------------------
342

    
343
  @staticmethod
344
  def perspective_lv_list(params):
345
    """Query the list of logical volumes in a given volume group.
346

    
347
    """
348
    vgname = params[0]
349
    return backend.GetVolumeList(vgname)
350

    
351
  @staticmethod
352
  def perspective_vg_list(params):
353
    """Query the list of volume groups.
354

    
355
    """
356
    return backend.ListVolumeGroups()
357

    
358
  # Storage --------------------------
359

    
360
  @staticmethod
361
  def perspective_storage_list(params):
362
    """Get list of storage units.
363

    
364
    """
365
    (su_name, su_args, name, fields) = params
366
    return storage.GetStorage(su_name, *su_args).List(name, fields)
367

    
368
  @staticmethod
369
  def perspective_storage_modify(params):
370
    """Modify a storage unit.
371

    
372
    """
373
    (su_name, su_args, name, changes) = params
374
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
375

    
376
  # bridge  --------------------------
377

    
378
  @staticmethod
379
  def perspective_bridges_exist(params):
380
    """Check if all bridges given exist on this node.
381

    
382
    """
383
    bridges_list = params[0]
384
    return backend.BridgesExist(bridges_list)
385

    
386
  # instance  --------------------------
387

    
388
  @staticmethod
389
  def perspective_instance_os_add(params):
390
    """Install an OS on a given instance.
391

    
392
    """
393
    inst_s = params[0]
394
    inst = objects.Instance.FromDict(inst_s)
395
    reinstall = params[1]
396
    return backend.InstanceOsAdd(inst, reinstall)
397

    
398
  @staticmethod
399
  def perspective_instance_run_rename(params):
400
    """Runs the OS rename script for an instance.
401

    
402
    """
403
    inst_s, old_name = params
404
    inst = objects.Instance.FromDict(inst_s)
405
    return backend.RunRenameInstance(inst, old_name)
406

    
407
  @staticmethod
408
  def perspective_instance_os_import(params):
409
    """Run the import function of an OS onto a given instance.
410

    
411
    """
412
    inst_s, src_node, src_images, cluster_name = params
413
    inst = objects.Instance.FromDict(inst_s)
414
    return backend.ImportOSIntoInstance(inst, src_node, src_images,
415
                                        cluster_name)
416

    
417
  @staticmethod
418
  def perspective_instance_shutdown(params):
419
    """Shutdown an instance.
420

    
421
    """
422
    instance = objects.Instance.FromDict(params[0])
423
    return backend.InstanceShutdown(instance)
424

    
425
  @staticmethod
426
  def perspective_instance_start(params):
427
    """Start an instance.
428

    
429
    """
430
    instance = objects.Instance.FromDict(params[0])
431
    return backend.StartInstance(instance)
432

    
433
  @staticmethod
434
  def perspective_migration_info(params):
435
    """Gather information about an instance to be migrated.
436

    
437
    """
438
    instance = objects.Instance.FromDict(params[0])
439
    return backend.MigrationInfo(instance)
440

    
441
  @staticmethod
442
  def perspective_accept_instance(params):
443
    """Prepare the node to accept an instance.
444

    
445
    """
446
    instance, info, target = params
447
    instance = objects.Instance.FromDict(instance)
448
    return backend.AcceptInstance(instance, info, target)
449

    
450
  @staticmethod
451
  def perspective_finalize_migration(params):
452
    """Finalize the instance migration.
453

    
454
    """
455
    instance, info, success = params
456
    instance = objects.Instance.FromDict(instance)
457
    return backend.FinalizeMigration(instance, info, success)
458

    
459
  @staticmethod
460
  def perspective_instance_migrate(params):
461
    """Migrates an instance.
462

    
463
    """
464
    instance, target, live = params
465
    instance = objects.Instance.FromDict(instance)
466
    return backend.MigrateInstance(instance, target, live)
467

    
468
  @staticmethod
469
  def perspective_instance_reboot(params):
470
    """Reboot an instance.
471

    
472
    """
473
    instance = objects.Instance.FromDict(params[0])
474
    reboot_type = params[1]
475
    return backend.InstanceReboot(instance, reboot_type)
476

    
477
  @staticmethod
478
  def perspective_instance_info(params):
479
    """Query instance information.
480

    
481
    """
482
    return backend.GetInstanceInfo(params[0], params[1])
483

    
484
  @staticmethod
485
  def perspective_instance_migratable(params):
486
    """Query whether the specified instance can be migrated.
487

    
488
    """
489
    instance = objects.Instance.FromDict(params[0])
490
    return backend.GetInstanceMigratable(instance)
491

    
492
  @staticmethod
493
  def perspective_all_instances_info(params):
494
    """Query information about all instances.
495

    
496
    """
497
    return backend.GetAllInstancesInfo(params[0])
498

    
499
  @staticmethod
500
  def perspective_instance_list(params):
501
    """Query the list of running instances.
502

    
503
    """
504
    return backend.GetInstanceList(params[0])
505

    
506
  # node --------------------------
507

    
508
  @staticmethod
509
  def perspective_node_tcp_ping(params):
510
    """Do a TcpPing on the remote node.
511

    
512
    """
513
    return utils.TcpPing(params[1], params[2], timeout=params[3],
514
                         live_port_needed=params[4], source=params[0])
515

    
516
  @staticmethod
517
  def perspective_node_has_ip_address(params):
518
    """Checks if a node has the given ip address.
519

    
520
    """
521
    return utils.OwnIpAddress(params[0])
522

    
523
  @staticmethod
524
  def perspective_node_info(params):
525
    """Query node information.
526

    
527
    """
528
    vgname, hypervisor_type = params
529
    return backend.GetNodeInfo(vgname, hypervisor_type)
530

    
531
  @staticmethod
532
  def perspective_node_add(params):
533
    """Complete the registration of this node in the cluster.
534

    
535
    """
536
    return backend.AddNode(params[0], params[1], params[2],
537
                           params[3], params[4], params[5])
538

    
539
  @staticmethod
540
  def perspective_node_verify(params):
541
    """Run a verify sequence on this node.
542

    
543
    """
544
    return backend.VerifyNode(params[0], params[1])
545

    
546
  @staticmethod
547
  def perspective_node_start_master(params):
548
    """Promote this node to master status.
549

    
550
    """
551
    return backend.StartMaster(params[0], params[1])
552

    
553
  @staticmethod
554
  def perspective_node_stop_master(params):
555
    """Demote this node from master status.
556

    
557
    """
558
    return backend.StopMaster(params[0])
559

    
560
  @staticmethod
561
  def perspective_node_leave_cluster(params):
562
    """Cleanup after leaving a cluster.
563

    
564
    """
565
    return backend.LeaveCluster()
566

    
567
  @staticmethod
568
  def perspective_node_volumes(params):
569
    """Query the list of all logical volume groups.
570

    
571
    """
572
    return backend.NodeVolumes()
573

    
574
  @staticmethod
575
  def perspective_node_demote_from_mc(params):
576
    """Demote a node from the master candidate role.
577

    
578
    """
579
    return backend.DemoteFromMC()
580

    
581

    
582
  @staticmethod
583
  def perspective_node_powercycle(params):
584
    """Tries to powercycle the nod.
585

    
586
    """
587
    hypervisor_type = params[0]
588
    return backend.PowercycleNode(hypervisor_type)
589

    
590

    
591
  # cluster --------------------------
592

    
593
  @staticmethod
594
  def perspective_version(params):
595
    """Query version information.
596

    
597
    """
598
    return constants.PROTOCOL_VERSION
599

    
600
  @staticmethod
601
  def perspective_upload_file(params):
602
    """Upload a file.
603

    
604
    Note that the backend implementation imposes strict rules on which
605
    files are accepted.
606

    
607
    """
608
    return backend.UploadFile(*params)
609

    
610
  @staticmethod
611
  def perspective_master_info(params):
612
    """Query master information.
613

    
614
    """
615
    return backend.GetMasterInfo()
616

    
617
  @staticmethod
618
  def perspective_write_ssconf_files(params):
619
    """Write ssconf files.
620

    
621
    """
622
    (values,) = params
623
    return backend.WriteSsconfFiles(values)
624

    
625
  # os -----------------------
626

    
627
  @staticmethod
628
  def perspective_os_diagnose(params):
629
    """Query detailed information about existing OSes.
630

    
631
    """
632
    return backend.DiagnoseOS()
633

    
634
  @staticmethod
635
  def perspective_os_get(params):
636
    """Query information about a given OS.
637

    
638
    """
639
    name = params[0]
640
    os_obj = backend.OSFromDisk(name)
641
    return os_obj.ToDict()
642

    
643
  # hooks -----------------------
644

    
645
  @staticmethod
646
  def perspective_hooks_runner(params):
647
    """Run hook scripts.
648

    
649
    """
650
    hpath, phase, env = params
651
    hr = backend.HooksRunner()
652
    return hr.RunHooks(hpath, phase, env)
653

    
654
  # iallocator -----------------
655

    
656
  @staticmethod
657
  def perspective_iallocator_runner(params):
658
    """Run an iallocator script.
659

    
660
    """
661
    name, idata = params
662
    iar = backend.IAllocatorRunner()
663
    return iar.Run(name, idata)
664

    
665
  # test -----------------------
666

    
667
  @staticmethod
668
  def perspective_test_delay(params):
669
    """Run test delay.
670

    
671
    """
672
    duration = params[0]
673
    status, rval = utils.TestDelay(duration)
674
    if not status:
675
      raise backend.RPCFail(rval)
676
    return rval
677

    
678
  # file storage ---------------
679

    
680
  @staticmethod
681
  def perspective_file_storage_dir_create(params):
682
    """Create the file storage directory.
683

    
684
    """
685
    file_storage_dir = params[0]
686
    return backend.CreateFileStorageDir(file_storage_dir)
687

    
688
  @staticmethod
689
  def perspective_file_storage_dir_remove(params):
690
    """Remove the file storage directory.
691

    
692
    """
693
    file_storage_dir = params[0]
694
    return backend.RemoveFileStorageDir(file_storage_dir)
695

    
696
  @staticmethod
697
  def perspective_file_storage_dir_rename(params):
698
    """Rename the file storage directory.
699

    
700
    """
701
    old_file_storage_dir = params[0]
702
    new_file_storage_dir = params[1]
703
    return backend.RenameFileStorageDir(old_file_storage_dir,
704
                                        new_file_storage_dir)
705

    
706
  # jobs ------------------------
707

    
708
  @staticmethod
709
  @_RequireJobQueueLock
710
  def perspective_jobqueue_update(params):
711
    """Update job queue.
712

    
713
    """
714
    (file_name, content) = params
715
    return backend.JobQueueUpdate(file_name, content)
716

    
717
  @staticmethod
718
  @_RequireJobQueueLock
719
  def perspective_jobqueue_purge(params):
720
    """Purge job queue.
721

    
722
    """
723
    return backend.JobQueuePurge()
724

    
725
  @staticmethod
726
  @_RequireJobQueueLock
727
  def perspective_jobqueue_rename(params):
728
    """Rename a job queue file.
729

    
730
    """
731
    # TODO: What if a file fails to rename?
732
    return [backend.JobQueueRename(old, new) for old, new in params]
733

    
734
  @staticmethod
735
  def perspective_jobqueue_set_drain(params):
736
    """Set/unset the queue drain flag.
737

    
738
    """
739
    drain_flag = params[0]
740
    return backend.JobQueueSetDrainFlag(drain_flag)
741

    
742

    
743
  # hypervisor ---------------
744

    
745
  @staticmethod
746
  def perspective_hypervisor_validate_params(params):
747
    """Validate the hypervisor parameters.
748

    
749
    """
750
    (hvname, hvparams) = params
751
    return backend.ValidateHVParams(hvname, hvparams)
752

    
753

    
754
def ExecNODED(options, args):
755
  """Main NODED function, executed with the pidfile held.
756

    
757
  """
758
  global queue_lock
759

    
760
  # Read SSL certificate
761
  if options.ssl:
762
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
763
                                    ssl_cert_path=options.ssl_cert)
764
  else:
765
    ssl_params = None
766

    
767
  # Prepare job queue
768
  queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
769

    
770
  mainloop = daemon.Mainloop()
771
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
772
                          ssl_params=ssl_params, ssl_verify_peer=True)
773
  server.Start()
774
  try:
775
    mainloop.Run()
776
  finally:
777
    server.Stop()
778

    
779

    
780
def main():
781
  """Main function for the node daemon.
782

    
783
  """
784
  parser = OptionParser(description="Ganeti node daemon",
785
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
786
                        version="%%prog (ganeti) %s" %
787
                        constants.RELEASE_VERSION)
788
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
789
  dirs.append((constants.LOG_OS_DIR, 0750))
790
  dirs.append((constants.LOCK_DIR, 1777))
791
  daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED)
792

    
793

    
794
if __name__ == '__main__':
795
  main()