Statistics
| Branch: | Tag: | Revision:

root / lib / server / noded.py @ a303027b

History | View | Annotate | Download (36.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36
import codecs
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti.storage import container
49
from ganeti import serializer
50
from ganeti import netutils
51
from ganeti import pathutils
52
from ganeti import ssconf
53

    
54
import ganeti.http.server # pylint: disable=W0611
55

    
56

    
57
queue_lock = None
58

    
59

    
60
def _extendReasonTrail(trail, source, reason=""):
61
  """Extend the reason trail with noded information
62

63
  The trail is extended by appending the name of the noded functionality
64
  """
65
  assert trail is not None
66
  trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67
  trail.append((trail_source, reason, utils.EpochNano()))
68

    
69

    
70
def _PrepareQueueLock():
71
  """Try to prepare the queue lock.
72

73
  @return: None for success, otherwise an exception object
74

75
  """
76
  global queue_lock # pylint: disable=W0603
77

    
78
  if queue_lock is not None:
79
    return None
80

    
81
  # Prepare job queue
82
  try:
83
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84
    return None
85
  except EnvironmentError, err:
86
    return err
87

    
88

    
89
def _RequireJobQueueLock(fn):
90
  """Decorator for job queue manipulating functions.
91

92
  """
93
  QUEUE_LOCK_TIMEOUT = 10
94

    
95
  def wrapper(*args, **kwargs):
96
    # Locking in exclusive, blocking mode because there could be several
97
    # children running at the same time. Waiting up to 10 seconds.
98
    if _PrepareQueueLock() is not None:
99
      raise errors.JobQueueError("Job queue failed initialization,"
100
                                 " cannot update jobs")
101
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102
    try:
103
      return fn(*args, **kwargs)
104
    finally:
105
      queue_lock.Unlock()
106

    
107
  return wrapper
108

    
109

    
110
def _DecodeImportExportIO(ieio, ieioargs):
111
  """Decodes import/export I/O information.
112

113
  """
114
  if ieio == constants.IEIO_RAW_DISK:
115
    assert len(ieioargs) == 1
116
    return (objects.Disk.FromDict(ieioargs[0]), )
117

    
118
  if ieio == constants.IEIO_SCRIPT:
119
    assert len(ieioargs) == 2
120
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121

    
122
  return ieioargs
123

    
124

    
125
def _DefaultAlternative(value, default):
126
  """Returns value or, if evaluating to False, a default value.
127

128
  Returns the given value, unless it evaluates to False. In the latter case the
129
  default value is returned.
130

131
  @param value: Value to return if it doesn't evaluate to False
132
  @param default: Default value
133
  @return: Given value or the default
134

135
  """
136
  if value:
137
    return value
138

    
139
  return default
140

    
141

    
142
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143
  """Subclass ensuring request handlers are locked in RAM.
144

145
  """
146
  def __init__(self, *args, **kwargs):
147
    utils.Mlockall()
148

    
149
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150

    
151

    
152
class NodeRequestHandler(http.server.HttpServerHandler):
153
  """The server implementation.
154

155
  This class holds all methods exposed over the RPC interface.
156

157
  """
158
  # too many public methods, and unused args - all methods get params
159
  # due to the API
160
  # pylint: disable=R0904,W0613
161
  def __init__(self):
162
    http.server.HttpServerHandler.__init__(self)
163
    self.noded_pid = os.getpid()
164

    
165
  def HandleRequest(self, req):
166
    """Handle a request.
167

168
    """
169

    
170
    if req.request_method.upper() != http.HTTP_POST:
171
      raise http.HttpBadRequest("Only the POST method is supported")
172

    
173
    path = req.request_path
174
    if path.startswith("/"):
175
      path = path[1:]
176

    
177
    method = getattr(self, "perspective_%s" % path, None)
178
    if method is None:
179
      raise http.HttpNotFound()
180

    
181
    try:
182
      result = (True, method(serializer.LoadJson(req.request_body)))
183

    
184
    except backend.RPCFail, err:
185
      # our custom failure exception; str(err) works fine if the
186
      # exception was constructed with a single argument, and in
187
      # this case, err.message == err.args[0] == str(err)
188
      result = (False, str(err))
189
    except errors.QuitGanetiException, err:
190
      # Tell parent to quit
191
      logging.info("Shutting down the node daemon, arguments: %s",
192
                   str(err.args))
193
      os.kill(self.noded_pid, signal.SIGTERM)
194
      # And return the error's arguments, which must be already in
195
      # correct tuple format
196
      result = err.args
197
    except Exception, err:
198
      logging.exception("Error in RPC call")
199
      result = (False, "Error while executing backend function: %s" % str(err))
200

    
201
    return serializer.DumpJson(result)
202

    
203
  # the new block devices  --------------------------
204

    
205
  @staticmethod
206
  def perspective_blockdev_create(params):
207
    """Create a block device.
208

209
    """
210
    (bdev_s, size, owner, on_primary, info, excl_stor) = params
211
    bdev = objects.Disk.FromDict(bdev_s)
212
    if bdev is None:
213
      raise ValueError("can't unserialize data!")
214
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
215
                                  excl_stor)
216

    
217
  @staticmethod
218
  def perspective_blockdev_pause_resume_sync(params):
219
    """Pause/resume sync of a block device.
220

221
    """
222
    disks_s, pause = params
223
    disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
224
    return backend.BlockdevPauseResumeSync(disks, pause)
225

    
226
  @staticmethod
227
  def perspective_blockdev_image(params):
228
    """Image a block device.
229

230
    """
231
    bdev_s, image, size = params
232
    bdev = objects.Disk.FromDict(bdev_s)
233
    return backend.BlockdevImage(bdev, image, size)
234

    
235
  @staticmethod
236
  def perspective_blockdev_wipe(params):
237
    """Wipe a block device.
238

239
    """
240
    bdev_s, offset, size = params
241
    bdev = objects.Disk.FromDict(bdev_s)
242
    return backend.BlockdevWipe(bdev, offset, size)
243

    
244
  @staticmethod
245
  def perspective_blockdev_remove(params):
246
    """Remove a block device.
247

248
    """
249
    bdev_s = params[0]
250
    bdev = objects.Disk.FromDict(bdev_s)
251
    return backend.BlockdevRemove(bdev)
252

    
253
  @staticmethod
254
  def perspective_blockdev_rename(params):
255
    """Remove a block device.
256

257
    """
258
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
259
    return backend.BlockdevRename(devlist)
260

    
261
  @staticmethod
262
  def perspective_blockdev_assemble(params):
263
    """Assemble a block device.
264

265
    """
266
    bdev_s, owner, on_primary, idx = params
267
    bdev = objects.Disk.FromDict(bdev_s)
268
    if bdev is None:
269
      raise ValueError("can't unserialize data!")
270
    return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
271

    
272
  @staticmethod
273
  def perspective_blockdev_shutdown(params):
274
    """Shutdown a block device.
275

276
    """
277
    bdev_s = params[0]
278
    bdev = objects.Disk.FromDict(bdev_s)
279
    if bdev is None:
280
      raise ValueError("can't unserialize data!")
281
    return backend.BlockdevShutdown(bdev)
282

    
283
  @staticmethod
284
  def perspective_blockdev_addchildren(params):
285
    """Add a child to a mirror device.
286

287
    Note: this is only valid for mirror devices. It's the caller's duty
288
    to send a correct disk, otherwise we raise an error.
289

290
    """
291
    bdev_s, ndev_s = params
292
    bdev = objects.Disk.FromDict(bdev_s)
293
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
294
    if bdev is None or ndevs.count(None) > 0:
295
      raise ValueError("can't unserialize data!")
296
    return backend.BlockdevAddchildren(bdev, ndevs)
297

    
298
  @staticmethod
299
  def perspective_blockdev_removechildren(params):
300
    """Remove a child from a mirror device.
301

302
    This is only valid for mirror devices, of course. It's the callers
303
    duty to send a correct disk, otherwise we raise an error.
304

305
    """
306
    bdev_s, ndev_s = params
307
    bdev = objects.Disk.FromDict(bdev_s)
308
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
309
    if bdev is None or ndevs.count(None) > 0:
310
      raise ValueError("can't unserialize data!")
311
    return backend.BlockdevRemovechildren(bdev, ndevs)
312

    
313
  @staticmethod
314
  def perspective_blockdev_getmirrorstatus(params):
315
    """Return the mirror status for a list of disks.
316

317
    """
318
    disks = [objects.Disk.FromDict(dsk_s)
319
             for dsk_s in params[0]]
320
    return [status.ToDict()
321
            for status in backend.BlockdevGetmirrorstatus(disks)]
322

    
323
  @staticmethod
324
  def perspective_blockdev_getmirrorstatus_multi(params):
325
    """Return the mirror status for a list of disks.
326

327
    """
328
    (node_disks, ) = params
329

    
330
    disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
331

    
332
    result = []
333

    
334
    for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
335
      if success:
336
        result.append((success, status.ToDict()))
337
      else:
338
        result.append((success, status))
339

    
340
    return result
341

    
342
  @staticmethod
343
  def perspective_blockdev_find(params):
344
    """Expose the FindBlockDevice functionality for a disk.
345

346
    This will try to find but not activate a disk.
347

348
    """
349
    disk = objects.Disk.FromDict(params[0])
350

    
351
    result = backend.BlockdevFind(disk)
352
    if result is None:
353
      return None
354

    
355
    return result.ToDict()
356

    
357
  @staticmethod
358
  def perspective_blockdev_snapshot(params):
359
    """Create a snapshot device.
360

361
    Note that this is only valid for LVM disks, if we get passed
362
    something else we raise an exception. The snapshot device can be
363
    remove by calling the generic block device remove call.
364

365
    """
366
    cfbd = objects.Disk.FromDict(params[0])
367
    return backend.BlockdevSnapshot(cfbd)
368

    
369
  @staticmethod
370
  def perspective_blockdev_grow(params):
371
    """Grow a stack of devices.
372

373
    """
374
    if len(params) < 5:
375
      raise ValueError("Received only %s parameters in blockdev_grow,"
376
                       " old master?" % len(params))
377
    cfbd = objects.Disk.FromDict(params[0])
378
    amount = params[1]
379
    dryrun = params[2]
380
    backingstore = params[3]
381
    excl_stor = params[4]
382
    return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
383

    
384
  @staticmethod
385
  def perspective_blockdev_close(params):
386
    """Closes the given block devices.
387

388
    """
389
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
390
    return backend.BlockdevClose(params[0], disks)
391

    
392
  @staticmethod
393
  def perspective_blockdev_getdimensions(params):
394
    """Compute the sizes of the given block devices.
395

396
    """
397
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
398
    return backend.BlockdevGetdimensions(disks)
399

    
400
  @staticmethod
401
  def perspective_blockdev_setinfo(params):
402
    """Sets metadata information on the given block device.
403

404
    """
405
    (disk, info) = params
406
    disk = objects.Disk.FromDict(disk)
407
    return backend.BlockdevSetInfo(disk, info)
408

    
409
  # blockdev/drbd specific methods ----------
410

    
411
  @staticmethod
412
  def perspective_drbd_disconnect_net(params):
413
    """Disconnects the network connection of drbd disks.
414

415
    Note that this is only valid for drbd disks, so the members of the
416
    disk list must all be drbd devices.
417

418
    """
419
    (disks,) = params
420
    disks = [objects.Disk.FromDict(disk) for disk in disks]
421
    return backend.DrbdDisconnectNet(disks)
422

    
423
  @staticmethod
424
  def perspective_drbd_attach_net(params):
425
    """Attaches the network connection of drbd disks.
426

427
    Note that this is only valid for drbd disks, so the members of the
428
    disk list must all be drbd devices.
429

430
    """
431
    disks, instance_name, multimaster = params
432
    disks = [objects.Disk.FromDict(disk) for disk in disks]
433
    return backend.DrbdAttachNet(disks, instance_name, multimaster)
434

    
435
  @staticmethod
436
  def perspective_drbd_wait_sync(params):
437
    """Wait until DRBD disks are synched.
438

439
    Note that this is only valid for drbd disks, so the members of the
440
    disk list must all be drbd devices.
441

442
    """
443
    (disks,) = params
444
    disks = [objects.Disk.FromDict(disk) for disk in disks]
445
    return backend.DrbdWaitSync(disks)
446

    
447
  @staticmethod
448
  def perspective_drbd_needs_activation(params):
449
    """Checks if the drbd devices need activation
450

451
    Note that this is only valid for drbd disks, so the members of the
452
    disk list must all be drbd devices.
453

454
    """
455
    (disks,) = params
456
    disks = [objects.Disk.FromDict(disk) for disk in disks]
457
    return backend.DrbdNeedsActivation(disks)
458

    
459
  @staticmethod
460
  def perspective_drbd_helper(_):
461
    """Query drbd helper.
462

463
    """
464
    return backend.GetDrbdUsermodeHelper()
465

    
466
  # export/import  --------------------------
467

    
468
  @staticmethod
469
  def perspective_finalize_export(params):
470
    """Expose the finalize export functionality.
471

472
    """
473
    instance = objects.Instance.FromDict(params[0])
474

    
475
    snap_disks = []
476
    for disk in params[1]:
477
      if isinstance(disk, bool):
478
        snap_disks.append(disk)
479
      else:
480
        snap_disks.append(objects.Disk.FromDict(disk))
481

    
482
    return backend.FinalizeExport(instance, snap_disks)
483

    
484
  @staticmethod
485
  def perspective_export_info(params):
486
    """Query information about an existing export on this node.
487

488
    The given path may not contain an export, in which case we return
489
    None.
490

491
    """
492
    path = params[0]
493
    return backend.ExportInfo(path)
494

    
495
  @staticmethod
496
  def perspective_export_list(params):
497
    """List the available exports on this node.
498

499
    Note that as opposed to export_info, which may query data about an
500
    export in any path, this only queries the standard Ganeti path
501
    (pathutils.EXPORT_DIR).
502

503
    """
504
    return backend.ListExports()
505

    
506
  @staticmethod
507
  def perspective_export_remove(params):
508
    """Remove an export.
509

510
    """
511
    export = params[0]
512
    return backend.RemoveExport(export)
513

    
514
  # block device ---------------------
515
  @staticmethod
516
  def perspective_bdev_sizes(params):
517
    """Query the list of block devices
518

519
    """
520
    devices = params[0]
521
    return backend.GetBlockDevSizes(devices)
522

    
523
  # volume  --------------------------
524

    
525
  @staticmethod
526
  def perspective_lv_list(params):
527
    """Query the list of logical volumes in a given volume group.
528

529
    """
530
    vgname = params[0]
531
    return backend.GetVolumeList(vgname)
532

    
533
  @staticmethod
534
  def perspective_vg_list(params):
535
    """Query the list of volume groups.
536

537
    """
538
    return backend.ListVolumeGroups()
539

    
540
  # Storage --------------------------
541

    
542
  @staticmethod
543
  def perspective_storage_list(params):
544
    """Get list of storage units.
545

546
    """
547
    (su_name, su_args, name, fields) = params
548
    return container.GetStorage(su_name, *su_args).List(name, fields)
549

    
550
  @staticmethod
551
  def perspective_storage_modify(params):
552
    """Modify a storage unit.
553

554
    """
555
    (su_name, su_args, name, changes) = params
556
    return container.GetStorage(su_name, *su_args).Modify(name, changes)
557

    
558
  @staticmethod
559
  def perspective_storage_execute(params):
560
    """Execute an operation on a storage unit.
561

562
    """
563
    (su_name, su_args, name, op) = params
564
    return container.GetStorage(su_name, *su_args).Execute(name, op)
565

    
566
  # bridge  --------------------------
567

    
568
  @staticmethod
569
  def perspective_bridges_exist(params):
570
    """Check if all bridges given exist on this node.
571

572
    """
573
    bridges_list = params[0]
574
    return backend.BridgesExist(bridges_list)
575

    
576
  # instance  --------------------------
577

    
578
  @staticmethod
579
  def perspective_instance_os_add(params):
580
    """Install an OS on a given instance.
581

582
    """
583
    inst_s = params[0]
584
    inst = objects.Instance.FromDict(inst_s)
585
    reinstall = params[1]
586
    debug = params[2]
587
    return backend.InstanceOsAdd(inst, reinstall, debug)
588

    
589
  @staticmethod
590
  def perspective_instance_run_rename(params):
591
    """Runs the OS rename script for an instance.
592

593
    """
594
    inst_s, old_name, debug = params
595
    inst = objects.Instance.FromDict(inst_s)
596
    return backend.RunRenameInstance(inst, old_name, debug)
597

    
598
  @staticmethod
599
  def perspective_instance_shutdown(params):
600
    """Shutdown an instance.
601

602
    """
603
    instance = objects.Instance.FromDict(params[0])
604
    timeout = params[1]
605
    trail = params[2]
606
    _extendReasonTrail(trail, "shutdown")
607
    return backend.InstanceShutdown(instance, timeout, trail)
608

    
609
  @staticmethod
610
  def perspective_instance_start(params):
611
    """Start an instance.
612

613
    """
614
    (instance_name, startup_paused, trail) = params
615
    instance = objects.Instance.FromDict(instance_name)
616
    _extendReasonTrail(trail, "start")
617
    return backend.StartInstance(instance, startup_paused, trail)
618

    
619
  @staticmethod
620
  def perspective_hotplug_device(params):
621
    """Hotplugs device to a running instance.
622

623
    """
624
    (idict, action, dev_type, ddict, extra, seq) = params
625
    instance = objects.Instance.FromDict(idict)
626
    if dev_type == constants.HOTPLUG_TARGET_DISK:
627
      device = objects.Disk.FromDict(ddict)
628
    elif dev_type == constants.HOTPLUG_TARGET_NIC:
629
      device = objects.NIC.FromDict(ddict)
630
    else:
631
      assert dev_type in constants.HOTPLUG_ALL_TARGETS
632
    return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
633

    
634
  @staticmethod
635
  def perspective_hotplug_supported(params):
636
    """Checks if hotplug is supported.
637

638
    """
639
    instance = objects.Instance.FromDict(params[0])
640
    return backend.HotplugSupported(instance)
641

    
642
  @staticmethod
643
  def perspective_migration_info(params):
644
    """Gather information about an instance to be migrated.
645

646
    """
647
    instance = objects.Instance.FromDict(params[0])
648
    return backend.MigrationInfo(instance)
649

    
650
  @staticmethod
651
  def perspective_accept_instance(params):
652
    """Prepare the node to accept an instance.
653

654
    """
655
    instance, info, target = params
656
    instance = objects.Instance.FromDict(instance)
657
    return backend.AcceptInstance(instance, info, target)
658

    
659
  @staticmethod
660
  def perspective_instance_finalize_migration_dst(params):
661
    """Finalize the instance migration on the destination node.
662

663
    """
664
    instance, info, success = params
665
    instance = objects.Instance.FromDict(instance)
666
    return backend.FinalizeMigrationDst(instance, info, success)
667

    
668
  @staticmethod
669
  def perspective_instance_migrate(params):
670
    """Migrates an instance.
671

672
    """
673
    cluster_name, instance, target, live = params
674
    instance = objects.Instance.FromDict(instance)
675
    return backend.MigrateInstance(cluster_name, instance, target, live)
676

    
677
  @staticmethod
678
  def perspective_instance_finalize_migration_src(params):
679
    """Finalize the instance migration on the source node.
680

681
    """
682
    instance, success, live = params
683
    instance = objects.Instance.FromDict(instance)
684
    return backend.FinalizeMigrationSource(instance, success, live)
685

    
686
  @staticmethod
687
  def perspective_instance_get_migration_status(params):
688
    """Reports migration status.
689

690
    """
691
    instance = objects.Instance.FromDict(params[0])
692
    return backend.GetMigrationStatus(instance).ToDict()
693

    
694
  @staticmethod
695
  def perspective_instance_reboot(params):
696
    """Reboot an instance.
697

698
    """
699
    instance = objects.Instance.FromDict(params[0])
700
    reboot_type = params[1]
701
    shutdown_timeout = params[2]
702
    trail = params[3]
703
    _extendReasonTrail(trail, "reboot")
704
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
705
                                  trail)
706

    
707
  @staticmethod
708
  def perspective_instance_balloon_memory(params):
709
    """Modify instance runtime memory.
710

711
    """
712
    instance_dict, memory = params
713
    instance = objects.Instance.FromDict(instance_dict)
714
    return backend.InstanceBalloonMemory(instance, memory)
715

    
716
  @staticmethod
717
  def perspective_instance_info(params):
718
    """Query instance information.
719

720
    """
721
    (instance_name, hypervisor_name, hvparams) = params
722
    return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
723

    
724
  @staticmethod
725
  def perspective_instance_migratable(params):
726
    """Query whether the specified instance can be migrated.
727

728
    """
729
    instance = objects.Instance.FromDict(params[0])
730
    return backend.GetInstanceMigratable(instance)
731

    
732
  @staticmethod
733
  def perspective_all_instances_info(params):
734
    """Query information about all instances.
735

736
    """
737
    (hypervisor_list, all_hvparams) = params
738
    return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
739

    
740
  @staticmethod
741
  def perspective_instance_console_info(params):
742
    """Query information on how to get console access to instances
743

744
    """
745
    return backend.GetInstanceConsoleInfo(params)
746

    
747
  @staticmethod
748
  def perspective_instance_list(params):
749
    """Query the list of running instances.
750

751
    """
752
    (hypervisor_list, hvparams) = params
753
    return backend.GetInstanceList(hypervisor_list, hvparams)
754

    
755
  # node --------------------------
756

    
757
  @staticmethod
758
  def perspective_node_has_ip_address(params):
759
    """Checks if a node has the given ip address.
760

761
    """
762
    return netutils.IPAddress.Own(params[0])
763

    
764
  @staticmethod
765
  def perspective_node_info(params):
766
    """Query node information.
767

768
    """
769
    (storage_units, hv_specs) = params
770
    return backend.GetNodeInfo(storage_units, hv_specs)
771

    
772
  @staticmethod
773
  def perspective_etc_hosts_modify(params):
774
    """Modify a node entry in /etc/hosts.
775

776
    """
777
    backend.EtcHostsModify(params[0], params[1], params[2])
778

    
779
    return True
780

    
781
  @staticmethod
782
  def perspective_node_verify(params):
783
    """Run a verify sequence on this node.
784

785
    """
786
    (what, cluster_name, hvparams, node_groups, groups_cfg) = params
787
    return backend.VerifyNode(what, cluster_name, hvparams,
788
                              node_groups, groups_cfg)
789

    
790
  @classmethod
791
  def perspective_node_verify_light(cls, params):
792
    """Run a light verify sequence on this node.
793

794
    This call is meant to perform a less strict verification of the node in
795
    certain situations. Right now, it is invoked only when a node is just about
796
    to be added to a cluster, and even then, it performs the same checks as
797
    L{perspective_node_verify}.
798
    """
799
    return cls.perspective_node_verify(params)
800

    
801
  @staticmethod
802
  def perspective_node_start_master_daemons(params):
803
    """Start the master daemons on this node.
804

805
    """
806
    return backend.StartMasterDaemons(params[0])
807

    
808
  @staticmethod
809
  def perspective_node_activate_master_ip(params):
810
    """Activate the master IP on this node.
811

812
    """
813
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
814
    return backend.ActivateMasterIp(master_params, params[1])
815

    
816
  @staticmethod
817
  def perspective_node_deactivate_master_ip(params):
818
    """Deactivate the master IP on this node.
819

820
    """
821
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
822
    return backend.DeactivateMasterIp(master_params, params[1])
823

    
824
  @staticmethod
825
  def perspective_node_stop_master(params):
826
    """Stops master daemons on this node.
827

828
    """
829
    return backend.StopMasterDaemons()
830

    
831
  @staticmethod
832
  def perspective_node_change_master_netmask(params):
833
    """Change the master IP netmask.
834

835
    """
836
    return backend.ChangeMasterNetmask(params[0], params[1], params[2],
837
                                       params[3])
838

    
839
  @staticmethod
840
  def perspective_node_leave_cluster(params):
841
    """Cleanup after leaving a cluster.
842

843
    """
844
    return backend.LeaveCluster(params[0])
845

    
846
  @staticmethod
847
  def perspective_node_volumes(params):
848
    """Query the list of all logical volume groups.
849

850
    """
851
    return backend.NodeVolumes()
852

    
853
  @staticmethod
854
  def perspective_node_demote_from_mc(params):
855
    """Demote a node from the master candidate role.
856

857
    """
858
    return backend.DemoteFromMC()
859

    
860
  @staticmethod
861
  def perspective_node_powercycle(params):
862
    """Tries to powercycle the node.
863

864
    """
865
    (hypervisor_type, hvparams) = params
866
    return backend.PowercycleNode(hypervisor_type, hvparams)
867

    
868
  @staticmethod
869
  def perspective_node_configure_ovs(params):
870
    """Sets up OpenvSwitch on the node.
871

872
    """
873
    (ovs_name, ovs_link) = params
874
    return backend.ConfigureOVS(ovs_name, ovs_link)
875

    
876
  @staticmethod
877
  def perspective_node_crypto_tokens(params):
878
    """Gets the node's public crypto tokens.
879

880
    """
881
    token_requests = params[0]
882
    return backend.GetCryptoTokens(token_requests)
883

    
884
  # cluster --------------------------
885

    
886
  @staticmethod
887
  def perspective_version(params):
888
    """Query version information.
889

890
    """
891
    return constants.PROTOCOL_VERSION
892

    
893
  @staticmethod
894
  def perspective_upload_file(params):
895
    """Upload a file.
896

897
    Note that the backend implementation imposes strict rules on which
898
    files are accepted.
899

900
    """
901
    return backend.UploadFile(*(params[0]))
902

    
903
  @staticmethod
904
  def perspective_upload_file_single(params):
905
    """Upload a file.
906

907
    Note that the backend implementation imposes strict rules on which
908
    files are accepted.
909

910
    """
911
    return backend.UploadFile(*params)
912

    
913
  @staticmethod
914
  def perspective_master_node_name(params):
915
    """Returns the master node name.
916

917
    """
918
    return backend.GetMasterNodeName()
919

    
920
  @staticmethod
921
  def perspective_run_oob(params):
922
    """Runs oob on node.
923

924
    """
925
    output = backend.RunOob(params[0], params[1], params[2], params[3])
926
    if output:
927
      result = serializer.LoadJson(output)
928
    else:
929
      result = None
930
    return result
931

    
932
  @staticmethod
933
  def perspective_restricted_command(params):
934
    """Runs a restricted command.
935

936
    """
937
    (cmd, ) = params
938

    
939
    return backend.RunRestrictedCmd(cmd)
940

    
941
  @staticmethod
942
  def perspective_write_ssconf_files(params):
943
    """Write ssconf files.
944

945
    """
946
    (values,) = params
947
    return ssconf.WriteSsconfFiles(values)
948

    
949
  @staticmethod
950
  def perspective_get_watcher_pause(params):
951
    """Get watcher pause end.
952

953
    """
954
    return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
955

    
956
  @staticmethod
957
  def perspective_set_watcher_pause(params):
958
    """Set watcher pause.
959

960
    """
961
    (until, ) = params
962
    return backend.SetWatcherPause(until)
963

    
964
  # os -----------------------
965

    
966
  @staticmethod
967
  def perspective_os_diagnose(params):
968
    """Query detailed information about existing OSes.
969

970
    """
971
    return backend.DiagnoseOS()
972

    
973
  @staticmethod
974
  def perspective_os_get(params):
975
    """Query information about a given OS.
976

977
    """
978
    name = params[0]
979
    os_obj = backend.OSFromDisk(name)
980
    return os_obj.ToDict()
981

    
982
  @staticmethod
983
  def perspective_os_validate(params):
984
    """Run a given OS' validation routine.
985

986
    """
987
    required, name, checks, params = params
988
    return backend.ValidateOS(required, name, checks, params)
989

    
990
  # extstorage -----------------------
991

    
992
  @staticmethod
993
  def perspective_extstorage_diagnose(params):
994
    """Query detailed information about existing extstorage providers.
995

996
    """
997
    return backend.DiagnoseExtStorage()
998

    
999
  # hooks -----------------------
1000

    
1001
  @staticmethod
1002
  def perspective_hooks_runner(params):
1003
    """Run hook scripts.
1004

1005
    """
1006
    hpath, phase, env = params
1007
    hr = backend.HooksRunner()
1008
    return hr.RunHooks(hpath, phase, env)
1009

    
1010
  # iallocator -----------------
1011

    
1012
  @staticmethod
1013
  def perspective_iallocator_runner(params):
1014
    """Run an iallocator script.
1015

1016
    """
1017
    name, idata, ial_params_dict = params
1018
    ial_params = []
1019
    for ial_param in ial_params_dict.items():
1020
      ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1021
    iar = backend.IAllocatorRunner()
1022
    return iar.Run(name, idata, ial_params)
1023

    
1024
  # test -----------------------
1025

    
1026
  @staticmethod
1027
  def perspective_test_delay(params):
1028
    """Run test delay.
1029

1030
    """
1031
    duration = params[0]
1032
    status, rval = utils.TestDelay(duration)
1033
    if not status:
1034
      raise backend.RPCFail(rval)
1035
    return rval
1036

    
1037
  # file storage ---------------
1038

    
1039
  @staticmethod
1040
  def perspective_file_storage_dir_create(params):
1041
    """Create the file storage directory.
1042

1043
    """
1044
    file_storage_dir = params[0]
1045
    return backend.CreateFileStorageDir(file_storage_dir)
1046

    
1047
  @staticmethod
1048
  def perspective_file_storage_dir_remove(params):
1049
    """Remove the file storage directory.
1050

1051
    """
1052
    file_storage_dir = params[0]
1053
    return backend.RemoveFileStorageDir(file_storage_dir)
1054

    
1055
  @staticmethod
1056
  def perspective_file_storage_dir_rename(params):
1057
    """Rename the file storage directory.
1058

1059
    """
1060
    old_file_storage_dir = params[0]
1061
    new_file_storage_dir = params[1]
1062
    return backend.RenameFileStorageDir(old_file_storage_dir,
1063
                                        new_file_storage_dir)
1064

    
1065
  # jobs ------------------------
1066

    
1067
  @staticmethod
1068
  @_RequireJobQueueLock
1069
  def perspective_jobqueue_update(params):
1070
    """Update job queue.
1071

1072
    """
1073
    (file_name, content) = params
1074
    return backend.JobQueueUpdate(file_name, content)
1075

    
1076
  @staticmethod
1077
  @_RequireJobQueueLock
1078
  def perspective_jobqueue_purge(params):
1079
    """Purge job queue.
1080

1081
    """
1082
    return backend.JobQueuePurge()
1083

    
1084
  @staticmethod
1085
  @_RequireJobQueueLock
1086
  def perspective_jobqueue_rename(params):
1087
    """Rename a job queue file.
1088

1089
    """
1090
    # TODO: What if a file fails to rename?
1091
    return [backend.JobQueueRename(old, new) for old, new in params[0]]
1092

    
1093
  @staticmethod
1094
  @_RequireJobQueueLock
1095
  def perspective_jobqueue_set_drain_flag(params):
1096
    """Set job queue's drain flag.
1097

1098
    """
1099
    (flag, ) = params
1100

    
1101
    return jstore.SetDrainFlag(flag)
1102

    
1103
  # hypervisor ---------------
1104

    
1105
  @staticmethod
1106
  def perspective_hypervisor_validate_params(params):
1107
    """Validate the hypervisor parameters.
1108

1109
    """
1110
    (hvname, hvparams) = params
1111
    return backend.ValidateHVParams(hvname, hvparams)
1112

    
1113
  # Crypto
1114

    
1115
  @staticmethod
1116
  def perspective_x509_cert_create(params):
1117
    """Creates a new X509 certificate for SSL/TLS.
1118

1119
    """
1120
    (validity, ) = params
1121
    return backend.CreateX509Certificate(validity)
1122

    
1123
  @staticmethod
1124
  def perspective_x509_cert_remove(params):
1125
    """Removes a X509 certificate.
1126

1127
    """
1128
    (name, ) = params
1129
    return backend.RemoveX509Certificate(name)
1130

    
1131
  # Import and export
1132

    
1133
  @staticmethod
1134
  def perspective_import_start(params):
1135
    """Starts an import daemon.
1136

1137
    """
1138
    (opts_s, instance, component, (dest, dest_args)) = params
1139

    
1140
    opts = objects.ImportExportOptions.FromDict(opts_s)
1141

    
1142
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1143
                                           None, None,
1144
                                           objects.Instance.FromDict(instance),
1145
                                           component, dest,
1146
                                           _DecodeImportExportIO(dest,
1147
                                                                 dest_args))
1148

    
1149
  @staticmethod
1150
  def perspective_export_start(params):
1151
    """Starts an export daemon.
1152

1153
    """
1154
    (opts_s, host, port, instance, component, (source, source_args)) = params
1155

    
1156
    opts = objects.ImportExportOptions.FromDict(opts_s)
1157

    
1158
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1159
                                           host, port,
1160
                                           objects.Instance.FromDict(instance),
1161
                                           component, source,
1162
                                           _DecodeImportExportIO(source,
1163
                                                                 source_args))
1164

    
1165
  @staticmethod
1166
  def perspective_impexp_status(params):
1167
    """Retrieves the status of an import or export daemon.
1168

1169
    """
1170
    return backend.GetImportExportStatus(params[0])
1171

    
1172
  @staticmethod
1173
  def perspective_impexp_abort(params):
1174
    """Aborts an import or export.
1175

1176
    """
1177
    return backend.AbortImportExport(params[0])
1178

    
1179
  @staticmethod
1180
  def perspective_impexp_cleanup(params):
1181
    """Cleans up after an import or export.
1182

1183
    """
1184
    return backend.CleanupImportExport(params[0])
1185

    
1186

    
1187
def CheckNoded(_, args):
1188
  """Initial checks whether to run or exit with a failure.
1189

1190
  """
1191
  if args: # noded doesn't take any arguments
1192
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1193
                          sys.argv[0])
1194
    sys.exit(constants.EXIT_FAILURE)
1195
  try:
1196
    codecs.lookup("string-escape")
1197
  except LookupError:
1198
    print >> sys.stderr, ("Can't load the string-escape code which is part"
1199
                          " of the Python installation. Is your installation"
1200
                          " complete/correct? Aborting.")
1201
    sys.exit(constants.EXIT_FAILURE)
1202

    
1203

    
1204
def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1205
  """Callback function to verify a peer against the candidate cert map.
1206

1207
  Note that we have a chicken-and-egg problem during cluster init and upgrade.
1208
  This method checks whether the incoming connection comes from a master
1209
  candidate by comparing it to the master certificate map in the cluster
1210
  configuration. However, during cluster init and cluster upgrade there
1211
  are various RPC calls done to the master node itself, before the candidate
1212
  certificate list is established and the cluster configuration is written.
1213
  In this case, we cannot check against the master candidate map.
1214

1215
  This problem is solved by checking whether the candidate map is empty. An
1216
  initialized 2.11 or higher cluster has at least one entry for the master
1217
  node in the candidate map. If the map is empty, we know that we are still
1218
  in the bootstrap/upgrade phase. In this case, we read the server certificate
1219
  digest and compare it to the incoming request.
1220

1221
  This means that after an upgrade of Ganeti, the system continues to operate
1222
  like before, using server certificates only. After the client certificates
1223
  are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1224
  RPC communication is switched to using client certificates and the trick of
1225
  using server certificates does not work anymore.
1226

1227
  @type conn: C{OpenSSL.SSL.Connection}
1228
  @param conn: the OpenSSL connection object
1229
  @type cert: C{OpenSSL.X509}
1230
  @param cert: the peer's SSL certificate
1231

1232
  """
1233
  # some parameters are unused, but this is the API
1234
  # pylint: disable=W0613
1235
  _BOOTSTRAP = "bootstrap"
1236
  sstore = ssconf.SimpleStore()
1237
  try:
1238
    candidate_certs = sstore.GetMasterCandidatesCertMap()
1239
  except errors.ConfigurationError:
1240
    logging.info("No candidate certificates found. Switching to "
1241
                 "bootstrap/update mode.")
1242
    candidate_certs = None
1243
  if not candidate_certs:
1244
    candidate_certs = {
1245
      _BOOTSTRAP: utils.GetCertificateDigest(
1246
        cert_filename=pathutils.NODED_CERT_FILE)}
1247
  return cert.digest("sha1") in candidate_certs.values()
1248
  # pylint: enable=W0613
1249

    
1250

    
1251
def PrepNoded(options, _):
1252
  """Preparation node daemon function, executed with the PID file held.
1253

1254
  """
1255
  if options.mlock:
1256
    request_executor_class = MlockallRequestExecutor
1257
    try:
1258
      utils.Mlockall()
1259
    except errors.NoCtypesError:
1260
      logging.warning("Cannot set memory lock, ctypes module not found")
1261
      request_executor_class = http.server.HttpServerRequestExecutor
1262
  else:
1263
    request_executor_class = http.server.HttpServerRequestExecutor
1264

    
1265
  # Read SSL certificate
1266
  if options.ssl:
1267
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1268
                                    ssl_cert_path=options.ssl_cert)
1269
  else:
1270
    ssl_params = None
1271

    
1272
  err = _PrepareQueueLock()
1273
  if err is not None:
1274
    # this might be some kind of file-system/permission error; while
1275
    # this breaks the job queue functionality, we shouldn't prevent
1276
    # startup of the whole node daemon because of this
1277
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1278

    
1279
  handler = NodeRequestHandler()
1280

    
1281
  mainloop = daemon.Mainloop()
1282
  server = \
1283
    http.server.HttpServer(mainloop, options.bind_address, options.port,
1284
                           handler, ssl_params=ssl_params, ssl_verify_peer=True,
1285
                           request_executor_class=request_executor_class,
1286
                           ssl_verify_callback=SSLVerifyPeer)
1287
  server.Start()
1288

    
1289
  return (mainloop, server)
1290

    
1291

    
1292
def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1293
  """Main node daemon function, executed with the PID file held.
1294

1295
  """
1296
  (mainloop, server) = prep_data
1297
  try:
1298
    mainloop.Run()
1299
  finally:
1300
    server.Stop()
1301

    
1302

    
1303
def Main():
1304
  """Main function for the node daemon.
1305

1306
  """
1307
  parser = OptionParser(description="Ganeti node daemon",
1308
                        usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1309
                               " [-i INTERFACE]"),
1310
                        version="%%prog (ganeti) %s" %
1311
                        constants.RELEASE_VERSION)
1312
  parser.add_option("--no-mlock", dest="mlock",
1313
                    help="Do not mlock the node memory in ram",
1314
                    default=True, action="store_false")
1315

    
1316
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1317
                     default_ssl_cert=pathutils.NODED_CERT_FILE,
1318
                     default_ssl_key=pathutils.NODED_CERT_FILE,
1319
                     console_logging=True,
1320
                     warn_breach=True)