Statistics
| Branch: | Tag: | Revision:

root / lib / server / noded.py @ b3590640

History | View | Annotate | Download (36.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36
import codecs
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti.storage import container
49
from ganeti import serializer
50
from ganeti import netutils
51
from ganeti import pathutils
52
from ganeti import ssconf
53

    
54
import ganeti.http.server # pylint: disable=W0611
55

    
56

    
57
queue_lock = None
58

    
59

    
60
def _extendReasonTrail(trail, source, reason=""):
61
  """Extend the reason trail with noded information
62

63
  The trail is extended by appending the name of the noded functionality
64
  """
65
  assert trail is not None
66
  trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67
  trail.append((trail_source, reason, utils.EpochNano()))
68

    
69

    
70
def _PrepareQueueLock():
71
  """Try to prepare the queue lock.
72

73
  @return: None for success, otherwise an exception object
74

75
  """
76
  global queue_lock # pylint: disable=W0603
77

    
78
  if queue_lock is not None:
79
    return None
80

    
81
  # Prepare job queue
82
  try:
83
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84
    return None
85
  except EnvironmentError, err:
86
    return err
87

    
88

    
89
def _RequireJobQueueLock(fn):
90
  """Decorator for job queue manipulating functions.
91

92
  """
93
  QUEUE_LOCK_TIMEOUT = 10
94

    
95
  def wrapper(*args, **kwargs):
96
    # Locking in exclusive, blocking mode because there could be several
97
    # children running at the same time. Waiting up to 10 seconds.
98
    if _PrepareQueueLock() is not None:
99
      raise errors.JobQueueError("Job queue failed initialization,"
100
                                 " cannot update jobs")
101
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102
    try:
103
      return fn(*args, **kwargs)
104
    finally:
105
      queue_lock.Unlock()
106

    
107
  return wrapper
108

    
109

    
110
def _DecodeImportExportIO(ieio, ieioargs):
111
  """Decodes import/export I/O information.
112

113
  """
114
  if ieio == constants.IEIO_RAW_DISK:
115
    assert len(ieioargs) == 1
116
    return (objects.Disk.FromDict(ieioargs[0]), )
117

    
118
  if ieio == constants.IEIO_SCRIPT:
119
    assert len(ieioargs) == 2
120
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121

    
122
  return ieioargs
123

    
124

    
125
def _DefaultAlternative(value, default):
126
  """Returns value or, if evaluating to False, a default value.
127

128
  Returns the given value, unless it evaluates to False. In the latter case the
129
  default value is returned.
130

131
  @param value: Value to return if it doesn't evaluate to False
132
  @param default: Default value
133
  @return: Given value or the default
134

135
  """
136
  if value:
137
    return value
138

    
139
  return default
140

    
141

    
142
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143
  """Subclass ensuring request handlers are locked in RAM.
144

145
  """
146
  def __init__(self, *args, **kwargs):
147
    utils.Mlockall()
148

    
149
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150

    
151

    
152
class NodeRequestHandler(http.server.HttpServerHandler):
153
  """The server implementation.
154

155
  This class holds all methods exposed over the RPC interface.
156

157
  """
158
  # too many public methods, and unused args - all methods get params
159
  # due to the API
160
  # pylint: disable=R0904,W0613
161
  def __init__(self):
162
    http.server.HttpServerHandler.__init__(self)
163
    self.noded_pid = os.getpid()
164

    
165
  def HandleRequest(self, req):
166
    """Handle a request.
167

168
    """
169

    
170
    if req.request_method.upper() != http.HTTP_POST:
171
      raise http.HttpBadRequest("Only the POST method is supported")
172

    
173
    path = req.request_path
174
    if path.startswith("/"):
175
      path = path[1:]
176

    
177
    method = getattr(self, "perspective_%s" % path, None)
178
    if method is None:
179
      raise http.HttpNotFound()
180

    
181
    try:
182
      result = (True, method(serializer.LoadJson(req.request_body)))
183

    
184
    except backend.RPCFail, err:
185
      # our custom failure exception; str(err) works fine if the
186
      # exception was constructed with a single argument, and in
187
      # this case, err.message == err.args[0] == str(err)
188
      result = (False, str(err))
189
    except errors.QuitGanetiException, err:
190
      # Tell parent to quit
191
      logging.info("Shutting down the node daemon, arguments: %s",
192
                   str(err.args))
193
      os.kill(self.noded_pid, signal.SIGTERM)
194
      # And return the error's arguments, which must be already in
195
      # correct tuple format
196
      result = err.args
197
    except Exception, err:
198
      logging.exception("Error in RPC call")
199
      result = (False, "Error while executing backend function: %s" % str(err))
200

    
201
    return serializer.DumpJson(result)
202

    
203
  # the new block devices  --------------------------
204

    
205
  @staticmethod
206
  def perspective_blockdev_create(params):
207
    """Create a block device.
208

209
    """
210
    (bdev_s, size, owner, on_primary, info, excl_stor) = params
211
    bdev = objects.Disk.FromDict(bdev_s)
212
    if bdev is None:
213
      raise ValueError("can't unserialize data!")
214
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
215
                                  excl_stor)
216

    
217
  @staticmethod
218
  def perspective_blockdev_pause_resume_sync(params):
219
    """Pause/resume sync of a block device.
220

221
    """
222
    disks_s, pause = params
223
    disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
224
    return backend.BlockdevPauseResumeSync(disks, pause)
225

    
226
  @staticmethod
227
  def perspective_blockdev_wipe(params):
228
    """Wipe a block device.
229

230
    """
231
    bdev_s, offset, size = params
232
    bdev = objects.Disk.FromDict(bdev_s)
233
    return backend.BlockdevWipe(bdev, offset, size)
234

    
235
  @staticmethod
236
  def perspective_blockdev_remove(params):
237
    """Remove a block device.
238

239
    """
240
    bdev_s = params[0]
241
    bdev = objects.Disk.FromDict(bdev_s)
242
    return backend.BlockdevRemove(bdev)
243

    
244
  @staticmethod
245
  def perspective_blockdev_rename(params):
246
    """Remove a block device.
247

248
    """
249
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
250
    return backend.BlockdevRename(devlist)
251

    
252
  @staticmethod
253
  def perspective_blockdev_assemble(params):
254
    """Assemble a block device.
255

256
    """
257
    bdev_s, owner, on_primary, idx = params
258
    bdev = objects.Disk.FromDict(bdev_s)
259
    if bdev is None:
260
      raise ValueError("can't unserialize data!")
261
    return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
262

    
263
  @staticmethod
264
  def perspective_blockdev_shutdown(params):
265
    """Shutdown a block device.
266

267
    """
268
    bdev_s = params[0]
269
    bdev = objects.Disk.FromDict(bdev_s)
270
    if bdev is None:
271
      raise ValueError("can't unserialize data!")
272
    return backend.BlockdevShutdown(bdev)
273

    
274
  @staticmethod
275
  def perspective_blockdev_addchildren(params):
276
    """Add a child to a mirror device.
277

278
    Note: this is only valid for mirror devices. It's the caller's duty
279
    to send a correct disk, otherwise we raise an error.
280

281
    """
282
    bdev_s, ndev_s = params
283
    bdev = objects.Disk.FromDict(bdev_s)
284
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
285
    if bdev is None or ndevs.count(None) > 0:
286
      raise ValueError("can't unserialize data!")
287
    return backend.BlockdevAddchildren(bdev, ndevs)
288

    
289
  @staticmethod
290
  def perspective_blockdev_removechildren(params):
291
    """Remove a child from a mirror device.
292

293
    This is only valid for mirror devices, of course. It's the callers
294
    duty to send a correct disk, otherwise we raise an error.
295

296
    """
297
    bdev_s, ndev_s = params
298
    bdev = objects.Disk.FromDict(bdev_s)
299
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
300
    if bdev is None or ndevs.count(None) > 0:
301
      raise ValueError("can't unserialize data!")
302
    return backend.BlockdevRemovechildren(bdev, ndevs)
303

    
304
  @staticmethod
305
  def perspective_blockdev_getmirrorstatus(params):
306
    """Return the mirror status for a list of disks.
307

308
    """
309
    disks = [objects.Disk.FromDict(dsk_s)
310
             for dsk_s in params[0]]
311
    return [status.ToDict()
312
            for status in backend.BlockdevGetmirrorstatus(disks)]
313

    
314
  @staticmethod
315
  def perspective_blockdev_getmirrorstatus_multi(params):
316
    """Return the mirror status for a list of disks.
317

318
    """
319
    (node_disks, ) = params
320

    
321
    disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
322

    
323
    result = []
324

    
325
    for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
326
      if success:
327
        result.append((success, status.ToDict()))
328
      else:
329
        result.append((success, status))
330

    
331
    return result
332

    
333
  @staticmethod
334
  def perspective_blockdev_find(params):
335
    """Expose the FindBlockDevice functionality for a disk.
336

337
    This will try to find but not activate a disk.
338

339
    """
340
    disk = objects.Disk.FromDict(params[0])
341

    
342
    result = backend.BlockdevFind(disk)
343
    if result is None:
344
      return None
345

    
346
    return result.ToDict()
347

    
348
  @staticmethod
349
  def perspective_blockdev_snapshot(params):
350
    """Create a snapshot device.
351

352
    Note that this is only valid for LVM disks, if we get passed
353
    something else we raise an exception. The snapshot device can be
354
    remove by calling the generic block device remove call.
355

356
    """
357
    cfbd = objects.Disk.FromDict(params[0])
358
    return backend.BlockdevSnapshot(cfbd)
359

    
360
  @staticmethod
361
  def perspective_blockdev_grow(params):
362
    """Grow a stack of devices.
363

364
    """
365
    if len(params) < 5:
366
      raise ValueError("Received only %s parameters in blockdev_grow,"
367
                       " old master?" % len(params))
368
    cfbd = objects.Disk.FromDict(params[0])
369
    amount = params[1]
370
    dryrun = params[2]
371
    backingstore = params[3]
372
    excl_stor = params[4]
373
    return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
374

    
375
  @staticmethod
376
  def perspective_blockdev_close(params):
377
    """Closes the given block devices.
378

379
    """
380
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
381
    return backend.BlockdevClose(params[0], disks)
382

    
383
  @staticmethod
384
  def perspective_blockdev_getdimensions(params):
385
    """Compute the sizes of the given block devices.
386

387
    """
388
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
389
    return backend.BlockdevGetdimensions(disks)
390

    
391
  @staticmethod
392
  def perspective_blockdev_setinfo(params):
393
    """Sets metadata information on the given block device.
394

395
    """
396
    (disk, info) = params
397
    disk = objects.Disk.FromDict(disk)
398
    return backend.BlockdevSetInfo(disk, info)
399

    
400
  # blockdev/drbd specific methods ----------
401

    
402
  @staticmethod
403
  def perspective_drbd_disconnect_net(params):
404
    """Disconnects the network connection of drbd disks.
405

406
    Note that this is only valid for drbd disks, so the members of the
407
    disk list must all be drbd devices.
408

409
    """
410
    (disks,) = params
411
    disks = [objects.Disk.FromDict(disk) for disk in disks]
412
    return backend.DrbdDisconnectNet(disks)
413

    
414
  @staticmethod
415
  def perspective_drbd_attach_net(params):
416
    """Attaches the network connection of drbd disks.
417

418
    Note that this is only valid for drbd disks, so the members of the
419
    disk list must all be drbd devices.
420

421
    """
422
    disks, instance_name, multimaster = params
423
    disks = [objects.Disk.FromDict(disk) for disk in disks]
424
    return backend.DrbdAttachNet(disks, instance_name, multimaster)
425

    
426
  @staticmethod
427
  def perspective_drbd_wait_sync(params):
428
    """Wait until DRBD disks are synched.
429

430
    Note that this is only valid for drbd disks, so the members of the
431
    disk list must all be drbd devices.
432

433
    """
434
    (disks,) = params
435
    disks = [objects.Disk.FromDict(disk) for disk in disks]
436
    return backend.DrbdWaitSync(disks)
437

    
438
  @staticmethod
439
  def perspective_drbd_needs_activation(params):
440
    """Checks if the drbd devices need activation
441

442
    Note that this is only valid for drbd disks, so the members of the
443
    disk list must all be drbd devices.
444

445
    """
446
    (disks,) = params
447
    disks = [objects.Disk.FromDict(disk) for disk in disks]
448
    return backend.DrbdNeedsActivation(disks)
449

    
450
  @staticmethod
451
  def perspective_drbd_helper(_):
452
    """Query drbd helper.
453

454
    """
455
    return backend.GetDrbdUsermodeHelper()
456

    
457
  # export/import  --------------------------
458

    
459
  @staticmethod
460
  def perspective_finalize_export(params):
461
    """Expose the finalize export functionality.
462

463
    """
464
    instance = objects.Instance.FromDict(params[0])
465

    
466
    snap_disks = []
467
    for disk in params[1]:
468
      if isinstance(disk, bool):
469
        snap_disks.append(disk)
470
      else:
471
        snap_disks.append(objects.Disk.FromDict(disk))
472

    
473
    return backend.FinalizeExport(instance, snap_disks)
474

    
475
  @staticmethod
476
  def perspective_export_info(params):
477
    """Query information about an existing export on this node.
478

479
    The given path may not contain an export, in which case we return
480
    None.
481

482
    """
483
    path = params[0]
484
    return backend.ExportInfo(path)
485

    
486
  @staticmethod
487
  def perspective_export_list(params):
488
    """List the available exports on this node.
489

490
    Note that as opposed to export_info, which may query data about an
491
    export in any path, this only queries the standard Ganeti path
492
    (pathutils.EXPORT_DIR).
493

494
    """
495
    return backend.ListExports()
496

    
497
  @staticmethod
498
  def perspective_export_remove(params):
499
    """Remove an export.
500

501
    """
502
    export = params[0]
503
    return backend.RemoveExport(export)
504

    
505
  # block device ---------------------
506
  @staticmethod
507
  def perspective_bdev_sizes(params):
508
    """Query the list of block devices
509

510
    """
511
    devices = params[0]
512
    return backend.GetBlockDevSizes(devices)
513

    
514
  # volume  --------------------------
515

    
516
  @staticmethod
517
  def perspective_lv_list(params):
518
    """Query the list of logical volumes in a given volume group.
519

520
    """
521
    vgname = params[0]
522
    return backend.GetVolumeList(vgname)
523

    
524
  @staticmethod
525
  def perspective_vg_list(params):
526
    """Query the list of volume groups.
527

528
    """
529
    return backend.ListVolumeGroups()
530

    
531
  # Storage --------------------------
532

    
533
  @staticmethod
534
  def perspective_storage_list(params):
535
    """Get list of storage units.
536

537
    """
538
    (su_name, su_args, name, fields) = params
539
    return container.GetStorage(su_name, *su_args).List(name, fields)
540

    
541
  @staticmethod
542
  def perspective_storage_modify(params):
543
    """Modify a storage unit.
544

545
    """
546
    (su_name, su_args, name, changes) = params
547
    return container.GetStorage(su_name, *su_args).Modify(name, changes)
548

    
549
  @staticmethod
550
  def perspective_storage_execute(params):
551
    """Execute an operation on a storage unit.
552

553
    """
554
    (su_name, su_args, name, op) = params
555
    return container.GetStorage(su_name, *su_args).Execute(name, op)
556

    
557
  # bridge  --------------------------
558

    
559
  @staticmethod
560
  def perspective_bridges_exist(params):
561
    """Check if all bridges given exist on this node.
562

563
    """
564
    bridges_list = params[0]
565
    return backend.BridgesExist(bridges_list)
566

    
567
  # instance  --------------------------
568

    
569
  @staticmethod
570
  def perspective_instance_os_add(params):
571
    """Install an OS on a given instance.
572

573
    """
574
    inst_s = params[0]
575
    inst = objects.Instance.FromDict(inst_s)
576
    reinstall = params[1]
577
    debug = params[2]
578
    return backend.InstanceOsAdd(inst, reinstall, debug)
579

    
580
  @staticmethod
581
  def perspective_instance_run_rename(params):
582
    """Runs the OS rename script for an instance.
583

584
    """
585
    inst_s, old_name, debug = params
586
    inst = objects.Instance.FromDict(inst_s)
587
    return backend.RunRenameInstance(inst, old_name, debug)
588

    
589
  @staticmethod
590
  def perspective_instance_shutdown(params):
591
    """Shutdown an instance.
592

593
    """
594
    instance = objects.Instance.FromDict(params[0])
595
    timeout = params[1]
596
    trail = params[2]
597
    _extendReasonTrail(trail, "shutdown")
598
    return backend.InstanceShutdown(instance, timeout, trail)
599

    
600
  @staticmethod
601
  def perspective_instance_start(params):
602
    """Start an instance.
603

604
    """
605
    (instance_name, startup_paused, trail) = params
606
    instance = objects.Instance.FromDict(instance_name)
607
    _extendReasonTrail(trail, "start")
608
    return backend.StartInstance(instance, startup_paused, trail)
609

    
610
  @staticmethod
611
  def perspective_hotplug_device(params):
612
    """Hotplugs device to a running instance.
613

614
    """
615
    (idict, action, dev_type, ddict, extra, seq) = params
616
    instance = objects.Instance.FromDict(idict)
617
    if dev_type == constants.HOTPLUG_TARGET_DISK:
618
      device = objects.Disk.FromDict(ddict)
619
    elif dev_type == constants.HOTPLUG_TARGET_NIC:
620
      device = objects.NIC.FromDict(ddict)
621
    else:
622
      assert dev_type in constants.HOTPLUG_ALL_TARGETS
623
    return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
624

    
625
  @staticmethod
626
  def perspective_hotplug_supported(params):
627
    """Checks if hotplug is supported.
628

629
    """
630
    instance = objects.Instance.FromDict(params[0])
631
    return backend.HotplugSupported(instance)
632

    
633
  @staticmethod
634
  def perspective_migration_info(params):
635
    """Gather information about an instance to be migrated.
636

637
    """
638
    instance = objects.Instance.FromDict(params[0])
639
    return backend.MigrationInfo(instance)
640

    
641
  @staticmethod
642
  def perspective_accept_instance(params):
643
    """Prepare the node to accept an instance.
644

645
    """
646
    instance, info, target = params
647
    instance = objects.Instance.FromDict(instance)
648
    return backend.AcceptInstance(instance, info, target)
649

    
650
  @staticmethod
651
  def perspective_instance_finalize_migration_dst(params):
652
    """Finalize the instance migration on the destination node.
653

654
    """
655
    instance, info, success = params
656
    instance = objects.Instance.FromDict(instance)
657
    return backend.FinalizeMigrationDst(instance, info, success)
658

    
659
  @staticmethod
660
  def perspective_instance_migrate(params):
661
    """Migrates an instance.
662

663
    """
664
    cluster_name, instance, target, live = params
665
    instance = objects.Instance.FromDict(instance)
666
    return backend.MigrateInstance(cluster_name, instance, target, live)
667

    
668
  @staticmethod
669
  def perspective_instance_finalize_migration_src(params):
670
    """Finalize the instance migration on the source node.
671

672
    """
673
    instance, success, live = params
674
    instance = objects.Instance.FromDict(instance)
675
    return backend.FinalizeMigrationSource(instance, success, live)
676

    
677
  @staticmethod
678
  def perspective_instance_get_migration_status(params):
679
    """Reports migration status.
680

681
    """
682
    instance = objects.Instance.FromDict(params[0])
683
    return backend.GetMigrationStatus(instance).ToDict()
684

    
685
  @staticmethod
686
  def perspective_instance_reboot(params):
687
    """Reboot an instance.
688

689
    """
690
    instance = objects.Instance.FromDict(params[0])
691
    reboot_type = params[1]
692
    shutdown_timeout = params[2]
693
    trail = params[3]
694
    _extendReasonTrail(trail, "reboot")
695
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
696
                                  trail)
697

    
698
  @staticmethod
699
  def perspective_instance_balloon_memory(params):
700
    """Modify instance runtime memory.
701

702
    """
703
    instance_dict, memory = params
704
    instance = objects.Instance.FromDict(instance_dict)
705
    return backend.InstanceBalloonMemory(instance, memory)
706

    
707
  @staticmethod
708
  def perspective_instance_info(params):
709
    """Query instance information.
710

711
    """
712
    (instance_name, hypervisor_name, hvparams) = params
713
    return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
714

    
715
  @staticmethod
716
  def perspective_instance_migratable(params):
717
    """Query whether the specified instance can be migrated.
718

719
    """
720
    instance = objects.Instance.FromDict(params[0])
721
    return backend.GetInstanceMigratable(instance)
722

    
723
  @staticmethod
724
  def perspective_all_instances_info(params):
725
    """Query information about all instances.
726

727
    """
728
    (hypervisor_list, all_hvparams) = params
729
    return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
730

    
731
  @staticmethod
732
  def perspective_instance_console_info(params):
733
    """Query information on how to get console access to instances
734

735
    """
736
    return backend.GetInstanceConsoleInfo(params)
737

    
738
  @staticmethod
739
  def perspective_instance_list(params):
740
    """Query the list of running instances.
741

742
    """
743
    (hypervisor_list, hvparams) = params
744
    return backend.GetInstanceList(hypervisor_list, hvparams)
745

    
746
  # node --------------------------
747

    
748
  @staticmethod
749
  def perspective_node_has_ip_address(params):
750
    """Checks if a node has the given ip address.
751

752
    """
753
    return netutils.IPAddress.Own(params[0])
754

    
755
  @staticmethod
756
  def perspective_node_info(params):
757
    """Query node information.
758

759
    """
760
    (storage_units, hv_specs) = params
761
    return backend.GetNodeInfo(storage_units, hv_specs)
762

    
763
  @staticmethod
764
  def perspective_etc_hosts_modify(params):
765
    """Modify a node entry in /etc/hosts.
766

767
    """
768
    backend.EtcHostsModify(params[0], params[1], params[2])
769

    
770
    return True
771

    
772
  @staticmethod
773
  def perspective_node_verify(params):
774
    """Run a verify sequence on this node.
775

776
    """
777
    (what, cluster_name, hvparams, node_groups, groups_cfg) = params
778
    return backend.VerifyNode(what, cluster_name, hvparams,
779
                              node_groups, groups_cfg)
780

    
781
  @classmethod
782
  def perspective_node_verify_light(cls, params):
783
    """Run a light verify sequence on this node.
784

785
    This call is meant to perform a less strict verification of the node in
786
    certain situations. Right now, it is invoked only when a node is just about
787
    to be added to a cluster, and even then, it performs the same checks as
788
    L{perspective_node_verify}.
789
    """
790
    return cls.perspective_node_verify(params)
791

    
792
  @staticmethod
793
  def perspective_node_start_master_daemons(params):
794
    """Start the master daemons on this node.
795

796
    """
797
    return backend.StartMasterDaemons(params[0])
798

    
799
  @staticmethod
800
  def perspective_node_activate_master_ip(params):
801
    """Activate the master IP on this node.
802

803
    """
804
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
805
    return backend.ActivateMasterIp(master_params, params[1])
806

    
807
  @staticmethod
808
  def perspective_node_deactivate_master_ip(params):
809
    """Deactivate the master IP on this node.
810

811
    """
812
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
813
    return backend.DeactivateMasterIp(master_params, params[1])
814

    
815
  @staticmethod
816
  def perspective_node_stop_master(params):
817
    """Stops master daemons on this node.
818

819
    """
820
    return backend.StopMasterDaemons()
821

    
822
  @staticmethod
823
  def perspective_node_change_master_netmask(params):
824
    """Change the master IP netmask.
825

826
    """
827
    return backend.ChangeMasterNetmask(params[0], params[1], params[2],
828
                                       params[3])
829

    
830
  @staticmethod
831
  def perspective_node_leave_cluster(params):
832
    """Cleanup after leaving a cluster.
833

834
    """
835
    return backend.LeaveCluster(params[0])
836

    
837
  @staticmethod
838
  def perspective_node_volumes(params):
839
    """Query the list of all logical volume groups.
840

841
    """
842
    return backend.NodeVolumes()
843

    
844
  @staticmethod
845
  def perspective_node_demote_from_mc(params):
846
    """Demote a node from the master candidate role.
847

848
    """
849
    return backend.DemoteFromMC()
850

    
851
  @staticmethod
852
  def perspective_node_powercycle(params):
853
    """Tries to powercycle the node.
854

855
    """
856
    (hypervisor_type, hvparams) = params
857
    return backend.PowercycleNode(hypervisor_type, hvparams)
858

    
859
  @staticmethod
860
  def perspective_node_configure_ovs(params):
861
    """Sets up OpenvSwitch on the node.
862

863
    """
864
    (ovs_name, ovs_link) = params
865
    return backend.ConfigureOVS(ovs_name, ovs_link)
866

    
867
  @staticmethod
868
  def perspective_node_crypto_tokens(params):
869
    """Gets the node's public crypto tokens.
870

871
    """
872
    token_requests = params[0]
873
    return backend.GetCryptoTokens(token_requests)
874

    
875
  # cluster --------------------------
876

    
877
  @staticmethod
878
  def perspective_version(params):
879
    """Query version information.
880

881
    """
882
    return constants.PROTOCOL_VERSION
883

    
884
  @staticmethod
885
  def perspective_upload_file(params):
886
    """Upload a file.
887

888
    Note that the backend implementation imposes strict rules on which
889
    files are accepted.
890

891
    """
892
    return backend.UploadFile(*(params[0]))
893

    
894
  @staticmethod
895
  def perspective_upload_file_single(params):
896
    """Upload a file.
897

898
    Note that the backend implementation imposes strict rules on which
899
    files are accepted.
900

901
    """
902
    return backend.UploadFile(*params)
903

    
904
  @staticmethod
905
  def perspective_master_node_name(params):
906
    """Returns the master node name.
907

908
    """
909
    return backend.GetMasterNodeName()
910

    
911
  @staticmethod
912
  def perspective_run_oob(params):
913
    """Runs oob on node.
914

915
    """
916
    output = backend.RunOob(params[0], params[1], params[2], params[3])
917
    if output:
918
      result = serializer.LoadJson(output)
919
    else:
920
      result = None
921
    return result
922

    
923
  @staticmethod
924
  def perspective_restricted_command(params):
925
    """Runs a restricted command.
926

927
    """
928
    (cmd, ) = params
929

    
930
    return backend.RunRestrictedCmd(cmd)
931

    
932
  @staticmethod
933
  def perspective_write_ssconf_files(params):
934
    """Write ssconf files.
935

936
    """
937
    (values,) = params
938
    return ssconf.WriteSsconfFiles(values)
939

    
940
  @staticmethod
941
  def perspective_get_watcher_pause(params):
942
    """Get watcher pause end.
943

944
    """
945
    return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
946

    
947
  @staticmethod
948
  def perspective_set_watcher_pause(params):
949
    """Set watcher pause.
950

951
    """
952
    (until, ) = params
953
    return backend.SetWatcherPause(until)
954

    
955
  # os -----------------------
956

    
957
  @staticmethod
958
  def perspective_os_diagnose(params):
959
    """Query detailed information about existing OSes.
960

961
    """
962
    return backend.DiagnoseOS()
963

    
964
  @staticmethod
965
  def perspective_os_get(params):
966
    """Query information about a given OS.
967

968
    """
969
    name = params[0]
970
    os_obj = backend.OSFromDisk(name)
971
    return os_obj.ToDict()
972

    
973
  @staticmethod
974
  def perspective_os_validate(params):
975
    """Run a given OS' validation routine.
976

977
    """
978
    required, name, checks, params = params
979
    return backend.ValidateOS(required, name, checks, params)
980

    
981
  # extstorage -----------------------
982

    
983
  @staticmethod
984
  def perspective_extstorage_diagnose(params):
985
    """Query detailed information about existing extstorage providers.
986

987
    """
988
    return backend.DiagnoseExtStorage()
989

    
990
  # hooks -----------------------
991

    
992
  @staticmethod
993
  def perspective_hooks_runner(params):
994
    """Run hook scripts.
995

996
    """
997
    hpath, phase, env = params
998
    hr = backend.HooksRunner()
999
    return hr.RunHooks(hpath, phase, env)
1000

    
1001
  # iallocator -----------------
1002

    
1003
  @staticmethod
1004
  def perspective_iallocator_runner(params):
1005
    """Run an iallocator script.
1006

1007
    """
1008
    name, idata, ial_params_dict = params
1009
    ial_params = []
1010
    for ial_param in ial_params_dict.items():
1011
      ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1012
    iar = backend.IAllocatorRunner()
1013
    return iar.Run(name, idata, ial_params)
1014

    
1015
  # test -----------------------
1016

    
1017
  @staticmethod
1018
  def perspective_test_delay(params):
1019
    """Run test delay.
1020

1021
    """
1022
    duration = params[0]
1023
    status, rval = utils.TestDelay(duration)
1024
    if not status:
1025
      raise backend.RPCFail(rval)
1026
    return rval
1027

    
1028
  # file storage ---------------
1029

    
1030
  @staticmethod
1031
  def perspective_file_storage_dir_create(params):
1032
    """Create the file storage directory.
1033

1034
    """
1035
    file_storage_dir = params[0]
1036
    return backend.CreateFileStorageDir(file_storage_dir)
1037

    
1038
  @staticmethod
1039
  def perspective_file_storage_dir_remove(params):
1040
    """Remove the file storage directory.
1041

1042
    """
1043
    file_storage_dir = params[0]
1044
    return backend.RemoveFileStorageDir(file_storage_dir)
1045

    
1046
  @staticmethod
1047
  def perspective_file_storage_dir_rename(params):
1048
    """Rename the file storage directory.
1049

1050
    """
1051
    old_file_storage_dir = params[0]
1052
    new_file_storage_dir = params[1]
1053
    return backend.RenameFileStorageDir(old_file_storage_dir,
1054
                                        new_file_storage_dir)
1055

    
1056
  # jobs ------------------------
1057

    
1058
  @staticmethod
1059
  @_RequireJobQueueLock
1060
  def perspective_jobqueue_update(params):
1061
    """Update job queue.
1062

1063
    """
1064
    (file_name, content) = params
1065
    return backend.JobQueueUpdate(file_name, content)
1066

    
1067
  @staticmethod
1068
  @_RequireJobQueueLock
1069
  def perspective_jobqueue_purge(params):
1070
    """Purge job queue.
1071

1072
    """
1073
    return backend.JobQueuePurge()
1074

    
1075
  @staticmethod
1076
  @_RequireJobQueueLock
1077
  def perspective_jobqueue_rename(params):
1078
    """Rename a job queue file.
1079

1080
    """
1081
    # TODO: What if a file fails to rename?
1082
    return [backend.JobQueueRename(old, new) for old, new in params[0]]
1083

    
1084
  @staticmethod
1085
  @_RequireJobQueueLock
1086
  def perspective_jobqueue_set_drain_flag(params):
1087
    """Set job queue's drain flag.
1088

1089
    """
1090
    (flag, ) = params
1091

    
1092
    return jstore.SetDrainFlag(flag)
1093

    
1094
  # hypervisor ---------------
1095

    
1096
  @staticmethod
1097
  def perspective_hypervisor_validate_params(params):
1098
    """Validate the hypervisor parameters.
1099

1100
    """
1101
    (hvname, hvparams) = params
1102
    return backend.ValidateHVParams(hvname, hvparams)
1103

    
1104
  # Crypto
1105

    
1106
  @staticmethod
1107
  def perspective_x509_cert_create(params):
1108
    """Creates a new X509 certificate for SSL/TLS.
1109

1110
    """
1111
    (validity, ) = params
1112
    return backend.CreateX509Certificate(validity)
1113

    
1114
  @staticmethod
1115
  def perspective_x509_cert_remove(params):
1116
    """Removes a X509 certificate.
1117

1118
    """
1119
    (name, ) = params
1120
    return backend.RemoveX509Certificate(name)
1121

    
1122
  # Import and export
1123

    
1124
  @staticmethod
1125
  def perspective_import_start(params):
1126
    """Starts an import daemon.
1127

1128
    """
1129
    (opts_s, instance, component, (dest, dest_args)) = params
1130

    
1131
    opts = objects.ImportExportOptions.FromDict(opts_s)
1132

    
1133
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1134
                                           None, None,
1135
                                           objects.Instance.FromDict(instance),
1136
                                           component, dest,
1137
                                           _DecodeImportExportIO(dest,
1138
                                                                 dest_args))
1139

    
1140
  @staticmethod
1141
  def perspective_export_start(params):
1142
    """Starts an export daemon.
1143

1144
    """
1145
    (opts_s, host, port, instance, component, (source, source_args)) = params
1146

    
1147
    opts = objects.ImportExportOptions.FromDict(opts_s)
1148

    
1149
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1150
                                           host, port,
1151
                                           objects.Instance.FromDict(instance),
1152
                                           component, source,
1153
                                           _DecodeImportExportIO(source,
1154
                                                                 source_args))
1155

    
1156
  @staticmethod
1157
  def perspective_impexp_status(params):
1158
    """Retrieves the status of an import or export daemon.
1159

1160
    """
1161
    return backend.GetImportExportStatus(params[0])
1162

    
1163
  @staticmethod
1164
  def perspective_impexp_abort(params):
1165
    """Aborts an import or export.
1166

1167
    """
1168
    return backend.AbortImportExport(params[0])
1169

    
1170
  @staticmethod
1171
  def perspective_impexp_cleanup(params):
1172
    """Cleans up after an import or export.
1173

1174
    """
1175
    return backend.CleanupImportExport(params[0])
1176

    
1177

    
1178
def CheckNoded(_, args):
1179
  """Initial checks whether to run or exit with a failure.
1180

1181
  """
1182
  if args: # noded doesn't take any arguments
1183
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1184
                          sys.argv[0])
1185
    sys.exit(constants.EXIT_FAILURE)
1186
  try:
1187
    codecs.lookup("string-escape")
1188
  except LookupError:
1189
    print >> sys.stderr, ("Can't load the string-escape code which is part"
1190
                          " of the Python installation. Is your installation"
1191
                          " complete/correct? Aborting.")
1192
    sys.exit(constants.EXIT_FAILURE)
1193

    
1194

    
1195
def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1196
  """Callback function to verify a peer against the candidate cert map.
1197

1198
  Note that we have a chicken-and-egg problem during cluster init and upgrade.
1199
  This method checks whether the incoming connection comes from a master
1200
  candidate by comparing it to the master certificate map in the cluster
1201
  configuration. However, during cluster init and cluster upgrade there
1202
  are various RPC calls done to the master node itself, before the candidate
1203
  certificate list is established and the cluster configuration is written.
1204
  In this case, we cannot check against the master candidate map.
1205

1206
  This problem is solved by checking whether the candidate map is empty. An
1207
  initialized 2.11 or higher cluster has at least one entry for the master
1208
  node in the candidate map. If the map is empty, we know that we are still
1209
  in the bootstrap/upgrade phase. In this case, we read the server certificate
1210
  digest and compare it to the incoming request.
1211

1212
  This means that after an upgrade of Ganeti, the system continues to operate
1213
  like before, using server certificates only. After the client certificates
1214
  are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1215
  RPC communication is switched to using client certificates and the trick of
1216
  using server certificates does not work anymore.
1217

1218
  @type conn: C{OpenSSL.SSL.Connection}
1219
  @param conn: the OpenSSL connection object
1220
  @type cert: C{OpenSSL.X509}
1221
  @param cert: the peer's SSL certificate
1222

1223
  """
1224
  # some parameters are unused, but this is the API
1225
  # pylint: disable=W0613
1226
  _BOOTSTRAP = "bootstrap"
1227
  sstore = ssconf.SimpleStore()
1228
  try:
1229
    candidate_certs = sstore.GetMasterCandidatesCertMap()
1230
  except errors.ConfigurationError:
1231
    logging.info("No candidate certificates found. Switching to "
1232
                 "bootstrap/update mode.")
1233
    candidate_certs = None
1234
  if not candidate_certs:
1235
    candidate_certs = {
1236
      _BOOTSTRAP: utils.GetCertificateDigest(
1237
        cert_filename=pathutils.NODED_CERT_FILE)}
1238
  return cert.digest("sha1") in candidate_certs.values()
1239
  # pylint: enable=W0613
1240

    
1241

    
1242
def PrepNoded(options, _):
1243
  """Preparation node daemon function, executed with the PID file held.
1244

1245
  """
1246
  if options.mlock:
1247
    request_executor_class = MlockallRequestExecutor
1248
    try:
1249
      utils.Mlockall()
1250
    except errors.NoCtypesError:
1251
      logging.warning("Cannot set memory lock, ctypes module not found")
1252
      request_executor_class = http.server.HttpServerRequestExecutor
1253
  else:
1254
    request_executor_class = http.server.HttpServerRequestExecutor
1255

    
1256
  # Read SSL certificate
1257
  if options.ssl:
1258
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1259
                                    ssl_cert_path=options.ssl_cert)
1260
  else:
1261
    ssl_params = None
1262

    
1263
  err = _PrepareQueueLock()
1264
  if err is not None:
1265
    # this might be some kind of file-system/permission error; while
1266
    # this breaks the job queue functionality, we shouldn't prevent
1267
    # startup of the whole node daemon because of this
1268
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1269

    
1270
  handler = NodeRequestHandler()
1271

    
1272
  mainloop = daemon.Mainloop()
1273
  server = \
1274
    http.server.HttpServer(mainloop, options.bind_address, options.port,
1275
                           handler, ssl_params=ssl_params, ssl_verify_peer=True,
1276
                           request_executor_class=request_executor_class,
1277
                           ssl_verify_callback=SSLVerifyPeer)
1278
  server.Start()
1279

    
1280
  return (mainloop, server)
1281

    
1282

    
1283
def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1284
  """Main node daemon function, executed with the PID file held.
1285

1286
  """
1287
  (mainloop, server) = prep_data
1288
  try:
1289
    mainloop.Run()
1290
  finally:
1291
    server.Stop()
1292

    
1293

    
1294
def Main():
1295
  """Main function for the node daemon.
1296

1297
  """
1298
  parser = OptionParser(description="Ganeti node daemon",
1299
                        usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1300
                               " [-i INTERFACE]"),
1301
                        version="%%prog (ganeti) %s" %
1302
                        constants.RELEASE_VERSION)
1303
  parser.add_option("--no-mlock", dest="mlock",
1304
                    help="Do not mlock the node memory in ram",
1305
                    default=True, action="store_false")
1306

    
1307
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1308
                     default_ssl_cert=pathutils.NODED_CERT_FILE,
1309
                     default_ssl_key=pathutils.NODED_CERT_FILE,
1310
                     console_logging=True,
1311
                     warn_breach=True)