Statistics
| Branch: | Tag: | Revision:

root / lib / server / noded.py @ ed748771

History | View | Annotate | Download (35.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36
import codecs
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti.storage import container
49
from ganeti import serializer
50
from ganeti import netutils
51
from ganeti import pathutils
52
from ganeti import ssconf
53

    
54
import ganeti.http.server # pylint: disable=W0611
55

    
56

    
57
queue_lock = None
58

    
59

    
60
def _extendReasonTrail(trail, source, reason=""):
61
  """Extend the reason trail with noded information
62

63
  The trail is extended by appending the name of the noded functionality
64
  """
65
  assert trail is not None
66
  trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67
  trail.append((trail_source, reason, utils.EpochNano()))
68

    
69

    
70
def _PrepareQueueLock():
71
  """Try to prepare the queue lock.
72

73
  @return: None for success, otherwise an exception object
74

75
  """
76
  global queue_lock # pylint: disable=W0603
77

    
78
  if queue_lock is not None:
79
    return None
80

    
81
  # Prepare job queue
82
  try:
83
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84
    return None
85
  except EnvironmentError, err:
86
    return err
87

    
88

    
89
def _RequireJobQueueLock(fn):
90
  """Decorator for job queue manipulating functions.
91

92
  """
93
  QUEUE_LOCK_TIMEOUT = 10
94

    
95
  def wrapper(*args, **kwargs):
96
    # Locking in exclusive, blocking mode because there could be several
97
    # children running at the same time. Waiting up to 10 seconds.
98
    if _PrepareQueueLock() is not None:
99
      raise errors.JobQueueError("Job queue failed initialization,"
100
                                 " cannot update jobs")
101
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102
    try:
103
      return fn(*args, **kwargs)
104
    finally:
105
      queue_lock.Unlock()
106

    
107
  return wrapper
108

    
109

    
110
def _DecodeImportExportIO(ieio, ieioargs):
111
  """Decodes import/export I/O information.
112

113
  """
114
  if ieio == constants.IEIO_RAW_DISK:
115
    assert len(ieioargs) == 1
116
    return (objects.Disk.FromDict(ieioargs[0]), )
117

    
118
  if ieio == constants.IEIO_SCRIPT:
119
    assert len(ieioargs) == 2
120
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121

    
122
  return ieioargs
123

    
124

    
125
def _DefaultAlternative(value, default):
126
  """Returns value or, if evaluating to False, a default value.
127

128
  Returns the given value, unless it evaluates to False. In the latter case the
129
  default value is returned.
130

131
  @param value: Value to return if it doesn't evaluate to False
132
  @param default: Default value
133
  @return: Given value or the default
134

135
  """
136
  if value:
137
    return value
138

    
139
  return default
140

    
141

    
142
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143
  """Subclass ensuring request handlers are locked in RAM.
144

145
  """
146
  def __init__(self, *args, **kwargs):
147
    utils.Mlockall()
148

    
149
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150

    
151

    
152
class NodeRequestHandler(http.server.HttpServerHandler):
153
  """The server implementation.
154

155
  This class holds all methods exposed over the RPC interface.
156

157
  """
158
  # too many public methods, and unused args - all methods get params
159
  # due to the API
160
  # pylint: disable=R0904,W0613
161
  def __init__(self):
162
    http.server.HttpServerHandler.__init__(self)
163
    self.noded_pid = os.getpid()
164

    
165
  def HandleRequest(self, req):
166
    """Handle a request.
167

168
    """
169

    
170
    if req.request_method.upper() != http.HTTP_POST:
171
      raise http.HttpBadRequest("Only the POST method is supported")
172

    
173
    path = req.request_path
174
    if path.startswith("/"):
175
      path = path[1:]
176

    
177
    method = getattr(self, "perspective_%s" % path, None)
178
    if method is None:
179
      raise http.HttpNotFound()
180

    
181
    try:
182
      result = (True, method(serializer.LoadJson(req.request_body)))
183

    
184
    except backend.RPCFail, err:
185
      # our custom failure exception; str(err) works fine if the
186
      # exception was constructed with a single argument, and in
187
      # this case, err.message == err.args[0] == str(err)
188
      result = (False, str(err))
189
    except errors.QuitGanetiException, err:
190
      # Tell parent to quit
191
      logging.info("Shutting down the node daemon, arguments: %s",
192
                   str(err.args))
193
      os.kill(self.noded_pid, signal.SIGTERM)
194
      # And return the error's arguments, which must be already in
195
      # correct tuple format
196
      result = err.args
197
    except Exception, err:
198
      logging.exception("Error in RPC call")
199
      result = (False, "Error while executing backend function: %s" % str(err))
200

    
201
    return serializer.DumpJson(result)
202

    
203
  # the new block devices  --------------------------
204

    
205
  @staticmethod
206
  def perspective_blockdev_create(params):
207
    """Create a block device.
208

209
    """
210
    (bdev_s, size, owner, on_primary, info, excl_stor) = params
211
    bdev = objects.Disk.FromDict(bdev_s)
212
    if bdev is None:
213
      raise ValueError("can't unserialize data!")
214
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
215
                                  excl_stor)
216

    
217
  @staticmethod
218
  def perspective_blockdev_pause_resume_sync(params):
219
    """Pause/resume sync of a block device.
220

221
    """
222
    disks_s, pause = params
223
    disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
224
    return backend.BlockdevPauseResumeSync(disks, pause)
225

    
226
  @staticmethod
227
  def perspective_blockdev_wipe(params):
228
    """Wipe a block device.
229

230
    """
231
    bdev_s, offset, size = params
232
    bdev = objects.Disk.FromDict(bdev_s)
233
    return backend.BlockdevWipe(bdev, offset, size)
234

    
235
  @staticmethod
236
  def perspective_blockdev_remove(params):
237
    """Remove a block device.
238

239
    """
240
    bdev_s = params[0]
241
    bdev = objects.Disk.FromDict(bdev_s)
242
    return backend.BlockdevRemove(bdev)
243

    
244
  @staticmethod
245
  def perspective_blockdev_rename(params):
246
    """Remove a block device.
247

248
    """
249
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
250
    return backend.BlockdevRename(devlist)
251

    
252
  @staticmethod
253
  def perspective_blockdev_assemble(params):
254
    """Assemble a block device.
255

256
    """
257
    bdev_s, owner, on_primary, idx = params
258
    bdev = objects.Disk.FromDict(bdev_s)
259
    if bdev is None:
260
      raise ValueError("can't unserialize data!")
261
    return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
262

    
263
  @staticmethod
264
  def perspective_blockdev_shutdown(params):
265
    """Shutdown a block device.
266

267
    """
268
    bdev_s = params[0]
269
    bdev = objects.Disk.FromDict(bdev_s)
270
    if bdev is None:
271
      raise ValueError("can't unserialize data!")
272
    return backend.BlockdevShutdown(bdev)
273

    
274
  @staticmethod
275
  def perspective_blockdev_addchildren(params):
276
    """Add a child to a mirror device.
277

278
    Note: this is only valid for mirror devices. It's the caller's duty
279
    to send a correct disk, otherwise we raise an error.
280

281
    """
282
    bdev_s, ndev_s = params
283
    bdev = objects.Disk.FromDict(bdev_s)
284
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
285
    if bdev is None or ndevs.count(None) > 0:
286
      raise ValueError("can't unserialize data!")
287
    return backend.BlockdevAddchildren(bdev, ndevs)
288

    
289
  @staticmethod
290
  def perspective_blockdev_removechildren(params):
291
    """Remove a child from a mirror device.
292

293
    This is only valid for mirror devices, of course. It's the callers
294
    duty to send a correct disk, otherwise we raise an error.
295

296
    """
297
    bdev_s, ndev_s = params
298
    bdev = objects.Disk.FromDict(bdev_s)
299
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
300
    if bdev is None or ndevs.count(None) > 0:
301
      raise ValueError("can't unserialize data!")
302
    return backend.BlockdevRemovechildren(bdev, ndevs)
303

    
304
  @staticmethod
305
  def perspective_blockdev_getmirrorstatus(params):
306
    """Return the mirror status for a list of disks.
307

308
    """
309
    disks = [objects.Disk.FromDict(dsk_s)
310
             for dsk_s in params[0]]
311
    return [status.ToDict()
312
            for status in backend.BlockdevGetmirrorstatus(disks)]
313

    
314
  @staticmethod
315
  def perspective_blockdev_getmirrorstatus_multi(params):
316
    """Return the mirror status for a list of disks.
317

318
    """
319
    (node_disks, ) = params
320

    
321
    disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
322

    
323
    result = []
324

    
325
    for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
326
      if success:
327
        result.append((success, status.ToDict()))
328
      else:
329
        result.append((success, status))
330

    
331
    return result
332

    
333
  @staticmethod
334
  def perspective_blockdev_find(params):
335
    """Expose the FindBlockDevice functionality for a disk.
336

337
    This will try to find but not activate a disk.
338

339
    """
340
    disk = objects.Disk.FromDict(params[0])
341

    
342
    result = backend.BlockdevFind(disk)
343
    if result is None:
344
      return None
345

    
346
    return result.ToDict()
347

    
348
  @staticmethod
349
  def perspective_blockdev_snapshot(params):
350
    """Create a snapshot device.
351

352
    Note that this is only valid for LVM disks, if we get passed
353
    something else we raise an exception. The snapshot device can be
354
    remove by calling the generic block device remove call.
355

356
    """
357
    cfbd = objects.Disk.FromDict(params[0])
358
    return backend.BlockdevSnapshot(cfbd)
359

    
360
  @staticmethod
361
  def perspective_blockdev_grow(params):
362
    """Grow a stack of devices.
363

364
    """
365
    if len(params) < 5:
366
      raise ValueError("Received only %s parameters in blockdev_grow,"
367
                       " old master?" % len(params))
368
    cfbd = objects.Disk.FromDict(params[0])
369
    amount = params[1]
370
    dryrun = params[2]
371
    backingstore = params[3]
372
    excl_stor = params[4]
373
    return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
374

    
375
  @staticmethod
376
  def perspective_blockdev_close(params):
377
    """Closes the given block devices.
378

379
    """
380
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
381
    return backend.BlockdevClose(params[0], disks)
382

    
383
  @staticmethod
384
  def perspective_blockdev_getdimensions(params):
385
    """Compute the sizes of the given block devices.
386

387
    """
388
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
389
    return backend.BlockdevGetdimensions(disks)
390

    
391
  @staticmethod
392
  def perspective_blockdev_setinfo(params):
393
    """Sets metadata information on the given block device.
394

395
    """
396
    (disk, info) = params
397
    disk = objects.Disk.FromDict(disk)
398
    return backend.BlockdevSetInfo(disk, info)
399

    
400
  # blockdev/drbd specific methods ----------
401

    
402
  @staticmethod
403
  def perspective_drbd_disconnect_net(params):
404
    """Disconnects the network connection of drbd disks.
405

406
    Note that this is only valid for drbd disks, so the members of the
407
    disk list must all be drbd devices.
408

409
    """
410
    (disks,) = params
411
    disks = [objects.Disk.FromDict(disk) for disk in disks]
412
    return backend.DrbdDisconnectNet(disks)
413

    
414
  @staticmethod
415
  def perspective_drbd_attach_net(params):
416
    """Attaches the network connection of drbd disks.
417

418
    Note that this is only valid for drbd disks, so the members of the
419
    disk list must all be drbd devices.
420

421
    """
422
    disks, instance_name, multimaster = params
423
    disks = [objects.Disk.FromDict(disk) for disk in disks]
424
    return backend.DrbdAttachNet(disks, instance_name, multimaster)
425

    
426
  @staticmethod
427
  def perspective_drbd_wait_sync(params):
428
    """Wait until DRBD disks are synched.
429

430
    Note that this is only valid for drbd disks, so the members of the
431
    disk list must all be drbd devices.
432

433
    """
434
    (disks,) = params
435
    disks = [objects.Disk.FromDict(disk) for disk in disks]
436
    return backend.DrbdWaitSync(disks)
437

    
438
  @staticmethod
439
  def perspective_drbd_needs_activation(params):
440
    """Checks if the drbd devices need activation
441

442
    Note that this is only valid for drbd disks, so the members of the
443
    disk list must all be drbd devices.
444

445
    """
446
    (disks,) = params
447
    disks = [objects.Disk.FromDict(disk) for disk in disks]
448
    return backend.DrbdNeedsActivation(disks)
449

    
450
  @staticmethod
451
  def perspective_drbd_helper(_):
452
    """Query drbd helper.
453

454
    """
455
    return backend.GetDrbdUsermodeHelper()
456

    
457
  # export/import  --------------------------
458

    
459
  @staticmethod
460
  def perspective_finalize_export(params):
461
    """Expose the finalize export functionality.
462

463
    """
464
    instance = objects.Instance.FromDict(params[0])
465

    
466
    snap_disks = []
467
    for disk in params[1]:
468
      if isinstance(disk, bool):
469
        snap_disks.append(disk)
470
      else:
471
        snap_disks.append(objects.Disk.FromDict(disk))
472

    
473
    return backend.FinalizeExport(instance, snap_disks)
474

    
475
  @staticmethod
476
  def perspective_export_info(params):
477
    """Query information about an existing export on this node.
478

479
    The given path may not contain an export, in which case we return
480
    None.
481

482
    """
483
    path = params[0]
484
    return backend.ExportInfo(path)
485

    
486
  @staticmethod
487
  def perspective_export_list(params):
488
    """List the available exports on this node.
489

490
    Note that as opposed to export_info, which may query data about an
491
    export in any path, this only queries the standard Ganeti path
492
    (pathutils.EXPORT_DIR).
493

494
    """
495
    return backend.ListExports()
496

    
497
  @staticmethod
498
  def perspective_export_remove(params):
499
    """Remove an export.
500

501
    """
502
    export = params[0]
503
    return backend.RemoveExport(export)
504

    
505
  # block device ---------------------
506
  @staticmethod
507
  def perspective_bdev_sizes(params):
508
    """Query the list of block devices
509

510
    """
511
    devices = params[0]
512
    return backend.GetBlockDevSizes(devices)
513

    
514
  # volume  --------------------------
515

    
516
  @staticmethod
517
  def perspective_lv_list(params):
518
    """Query the list of logical volumes in a given volume group.
519

520
    """
521
    vgname = params[0]
522
    return backend.GetVolumeList(vgname)
523

    
524
  @staticmethod
525
  def perspective_vg_list(params):
526
    """Query the list of volume groups.
527

528
    """
529
    return backend.ListVolumeGroups()
530

    
531
  # Storage --------------------------
532

    
533
  @staticmethod
534
  def perspective_storage_list(params):
535
    """Get list of storage units.
536

537
    """
538
    (su_name, su_args, name, fields) = params
539
    return container.GetStorage(su_name, *su_args).List(name, fields)
540

    
541
  @staticmethod
542
  def perspective_storage_modify(params):
543
    """Modify a storage unit.
544

545
    """
546
    (su_name, su_args, name, changes) = params
547
    return container.GetStorage(su_name, *su_args).Modify(name, changes)
548

    
549
  @staticmethod
550
  def perspective_storage_execute(params):
551
    """Execute an operation on a storage unit.
552

553
    """
554
    (su_name, su_args, name, op) = params
555
    return container.GetStorage(su_name, *su_args).Execute(name, op)
556

    
557
  # bridge  --------------------------
558

    
559
  @staticmethod
560
  def perspective_bridges_exist(params):
561
    """Check if all bridges given exist on this node.
562

563
    """
564
    bridges_list = params[0]
565
    return backend.BridgesExist(bridges_list)
566

    
567
  # instance  --------------------------
568

    
569
  @staticmethod
570
  def perspective_instance_os_add(params):
571
    """Install an OS on a given instance.
572

573
    """
574
    inst_s = params[0]
575
    inst = objects.Instance.FromDict(inst_s)
576
    reinstall = params[1]
577
    debug = params[2]
578
    return backend.InstanceOsAdd(inst, reinstall, debug)
579

    
580
  @staticmethod
581
  def perspective_instance_run_rename(params):
582
    """Runs the OS rename script for an instance.
583

584
    """
585
    inst_s, old_name, debug = params
586
    inst = objects.Instance.FromDict(inst_s)
587
    return backend.RunRenameInstance(inst, old_name, debug)
588

    
589
  @staticmethod
590
  def perspective_instance_shutdown(params):
591
    """Shutdown an instance.
592

593
    """
594
    instance = objects.Instance.FromDict(params[0])
595
    timeout = params[1]
596
    trail = params[2]
597
    _extendReasonTrail(trail, "shutdown")
598
    return backend.InstanceShutdown(instance, timeout, trail)
599

    
600
  @staticmethod
601
  def perspective_instance_start(params):
602
    """Start an instance.
603

604
    """
605
    (instance_name, startup_paused, trail) = params
606
    instance = objects.Instance.FromDict(instance_name)
607
    _extendReasonTrail(trail, "start")
608
    return backend.StartInstance(instance, startup_paused, trail)
609

    
610
  @staticmethod
611
  def perspective_hotplug_device(params):
612
    """Hotplugs device to a running instance.
613

614
    """
615
    (idict, action, dev_type, ddict, extra, seq) = params
616
    instance = objects.Instance.FromDict(idict)
617
    if dev_type == constants.HOTPLUG_TARGET_DISK:
618
      device = objects.Disk.FromDict(ddict)
619
    elif dev_type == constants.HOTPLUG_TARGET_NIC:
620
      device = objects.NIC.FromDict(ddict)
621
    else:
622
      assert dev_type in constants.HOTPLUG_ALL_TARGETS
623
    return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
624

    
625
  @staticmethod
626
  def perspective_hotplug_supported(params):
627
    """Checks if hotplug is supported.
628

629
    """
630
    instance = objects.Instance.FromDict(params[0])
631
    return backend.HotplugSupported(instance)
632

    
633
  @staticmethod
634
  def perspective_migration_info(params):
635
    """Gather information about an instance to be migrated.
636

637
    """
638
    instance = objects.Instance.FromDict(params[0])
639
    return backend.MigrationInfo(instance)
640

    
641
  @staticmethod
642
  def perspective_accept_instance(params):
643
    """Prepare the node to accept an instance.
644

645
    """
646
    instance, info, target = params
647
    instance = objects.Instance.FromDict(instance)
648
    return backend.AcceptInstance(instance, info, target)
649

    
650
  @staticmethod
651
  def perspective_instance_finalize_migration_dst(params):
652
    """Finalize the instance migration on the destination node.
653

654
    """
655
    instance, info, success = params
656
    instance = objects.Instance.FromDict(instance)
657
    return backend.FinalizeMigrationDst(instance, info, success)
658

    
659
  @staticmethod
660
  def perspective_instance_migrate(params):
661
    """Migrates an instance.
662

663
    """
664
    cluster_name, instance, target, live = params
665
    instance = objects.Instance.FromDict(instance)
666
    return backend.MigrateInstance(cluster_name, instance, target, live)
667

    
668
  @staticmethod
669
  def perspective_instance_finalize_migration_src(params):
670
    """Finalize the instance migration on the source node.
671

672
    """
673
    instance, success, live = params
674
    instance = objects.Instance.FromDict(instance)
675
    return backend.FinalizeMigrationSource(instance, success, live)
676

    
677
  @staticmethod
678
  def perspective_instance_get_migration_status(params):
679
    """Reports migration status.
680

681
    """
682
    instance = objects.Instance.FromDict(params[0])
683
    return backend.GetMigrationStatus(instance).ToDict()
684

    
685
  @staticmethod
686
  def perspective_instance_reboot(params):
687
    """Reboot an instance.
688

689
    """
690
    instance = objects.Instance.FromDict(params[0])
691
    reboot_type = params[1]
692
    shutdown_timeout = params[2]
693
    trail = params[3]
694
    _extendReasonTrail(trail, "reboot")
695
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
696
                                  trail)
697

    
698
  @staticmethod
699
  def perspective_instance_balloon_memory(params):
700
    """Modify instance runtime memory.
701

702
    """
703
    instance_dict, memory = params
704
    instance = objects.Instance.FromDict(instance_dict)
705
    return backend.InstanceBalloonMemory(instance, memory)
706

    
707
  @staticmethod
708
  def perspective_instance_info(params):
709
    """Query instance information.
710

711
    """
712
    (instance_name, hypervisor_name, hvparams) = params
713
    return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
714

    
715
  @staticmethod
716
  def perspective_instance_migratable(params):
717
    """Query whether the specified instance can be migrated.
718

719
    """
720
    instance = objects.Instance.FromDict(params[0])
721
    return backend.GetInstanceMigratable(instance)
722

    
723
  @staticmethod
724
  def perspective_all_instances_info(params):
725
    """Query information about all instances.
726

727
    """
728
    (hypervisor_list, all_hvparams) = params
729
    return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
730

    
731
  @staticmethod
732
  def perspective_instance_console_info(params):
733
    """Query information on how to get console access to instances
734

735
    """
736
    return backend.GetInstanceConsoleInfo(params)
737

    
738
  @staticmethod
739
  def perspective_instance_list(params):
740
    """Query the list of running instances.
741

742
    """
743
    (hypervisor_list, hvparams) = params
744
    return backend.GetInstanceList(hypervisor_list, hvparams)
745

    
746
  # node --------------------------
747

    
748
  @staticmethod
749
  def perspective_node_has_ip_address(params):
750
    """Checks if a node has the given ip address.
751

752
    """
753
    return netutils.IPAddress.Own(params[0])
754

    
755
  @staticmethod
756
  def perspective_node_info(params):
757
    """Query node information.
758

759
    """
760
    (storage_units, hv_specs) = params
761
    return backend.GetNodeInfo(storage_units, hv_specs)
762

    
763
  @staticmethod
764
  def perspective_etc_hosts_modify(params):
765
    """Modify a node entry in /etc/hosts.
766

767
    """
768
    backend.EtcHostsModify(params[0], params[1], params[2])
769

    
770
    return True
771

    
772
  @staticmethod
773
  def perspective_node_verify(params):
774
    """Run a verify sequence on this node.
775

776
    """
777
    (what, cluster_name, hvparams, node_groups, groups_cfg) = params
778
    return backend.VerifyNode(what, cluster_name, hvparams,
779
                              node_groups, groups_cfg)
780

    
781
  @classmethod
782
  def perspective_node_verify_light(cls, params):
783
    """Run a light verify sequence on this node.
784

785
    This call is meant to perform a less strict verification of the node in
786
    certain situations. Right now, it is invoked only when a node is just about
787
    to be added to a cluster, and even then, it performs the same checks as
788
    L{perspective_node_verify}.
789
    """
790
    return cls.perspective_node_verify(params)
791

    
792
  @staticmethod
793
  def perspective_node_start_master_daemons(params):
794
    """Start the master daemons on this node.
795

796
    """
797
    return backend.StartMasterDaemons(params[0])
798

    
799
  @staticmethod
800
  def perspective_node_activate_master_ip(params):
801
    """Activate the master IP on this node.
802

803
    """
804
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
805
    return backend.ActivateMasterIp(master_params, params[1])
806

    
807
  @staticmethod
808
  def perspective_node_deactivate_master_ip(params):
809
    """Deactivate the master IP on this node.
810

811
    """
812
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
813
    return backend.DeactivateMasterIp(master_params, params[1])
814

    
815
  @staticmethod
816
  def perspective_node_stop_master(params):
817
    """Stops master daemons on this node.
818

819
    """
820
    return backend.StopMasterDaemons()
821

    
822
  @staticmethod
823
  def perspective_node_change_master_netmask(params):
824
    """Change the master IP netmask.
825

826
    """
827
    return backend.ChangeMasterNetmask(params[0], params[1], params[2],
828
                                       params[3])
829

    
830
  @staticmethod
831
  def perspective_node_leave_cluster(params):
832
    """Cleanup after leaving a cluster.
833

834
    """
835
    return backend.LeaveCluster(params[0])
836

    
837
  @staticmethod
838
  def perspective_node_volumes(params):
839
    """Query the list of all logical volume groups.
840

841
    """
842
    return backend.NodeVolumes()
843

    
844
  @staticmethod
845
  def perspective_node_demote_from_mc(params):
846
    """Demote a node from the master candidate role.
847

848
    """
849
    return backend.DemoteFromMC()
850

    
851
  @staticmethod
852
  def perspective_node_powercycle(params):
853
    """Tries to powercycle the node.
854

855
    """
856
    (hypervisor_type, hvparams) = params
857
    return backend.PowercycleNode(hypervisor_type, hvparams)
858

    
859
  @staticmethod
860
  def perspective_node_configure_ovs(params):
861
    """Sets up OpenvSwitch on the node.
862

863
    """
864
    (ovs_name, ovs_link) = params
865
    return backend.ConfigureOVS(ovs_name, ovs_link)
866

    
867
  @staticmethod
868
  def perspective_node_crypto_tokens(params):
869
    """Gets the node's public crypto tokens.
870

871
    """
872
    token_requests = params[0]
873
    return backend.GetCryptoTokens(token_requests)
874

    
875
  # cluster --------------------------
876

    
877
  @staticmethod
878
  def perspective_version(params):
879
    """Query version information.
880

881
    """
882
    return constants.PROTOCOL_VERSION
883

    
884
  @staticmethod
885
  def perspective_upload_file(params):
886
    """Upload a file.
887

888
    Note that the backend implementation imposes strict rules on which
889
    files are accepted.
890

891
    """
892
    return backend.UploadFile(*(params[0]))
893

    
894
  @staticmethod
895
  def perspective_master_info(params):
896
    """Query master information.
897

898
    """
899
    return backend.GetMasterInfo()
900

    
901
  @staticmethod
902
  def perspective_run_oob(params):
903
    """Runs oob on node.
904

905
    """
906
    output = backend.RunOob(params[0], params[1], params[2], params[3])
907
    if output:
908
      result = serializer.LoadJson(output)
909
    else:
910
      result = None
911
    return result
912

    
913
  @staticmethod
914
  def perspective_restricted_command(params):
915
    """Runs a restricted command.
916

917
    """
918
    (cmd, ) = params
919

    
920
    return backend.RunRestrictedCmd(cmd)
921

    
922
  @staticmethod
923
  def perspective_write_ssconf_files(params):
924
    """Write ssconf files.
925

926
    """
927
    (values,) = params
928
    return ssconf.WriteSsconfFiles(values)
929

    
930
  @staticmethod
931
  def perspective_get_watcher_pause(params):
932
    """Get watcher pause end.
933

934
    """
935
    return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
936

    
937
  @staticmethod
938
  def perspective_set_watcher_pause(params):
939
    """Set watcher pause.
940

941
    """
942
    (until, ) = params
943
    return backend.SetWatcherPause(until)
944

    
945
  # os -----------------------
946

    
947
  @staticmethod
948
  def perspective_os_diagnose(params):
949
    """Query detailed information about existing OSes.
950

951
    """
952
    return backend.DiagnoseOS()
953

    
954
  @staticmethod
955
  def perspective_os_get(params):
956
    """Query information about a given OS.
957

958
    """
959
    name = params[0]
960
    os_obj = backend.OSFromDisk(name)
961
    return os_obj.ToDict()
962

    
963
  @staticmethod
964
  def perspective_os_validate(params):
965
    """Run a given OS' validation routine.
966

967
    """
968
    required, name, checks, params = params
969
    return backend.ValidateOS(required, name, checks, params)
970

    
971
  # extstorage -----------------------
972

    
973
  @staticmethod
974
  def perspective_extstorage_diagnose(params):
975
    """Query detailed information about existing extstorage providers.
976

977
    """
978
    return backend.DiagnoseExtStorage()
979

    
980
  # hooks -----------------------
981

    
982
  @staticmethod
983
  def perspective_hooks_runner(params):
984
    """Run hook scripts.
985

986
    """
987
    hpath, phase, env = params
988
    hr = backend.HooksRunner()
989
    return hr.RunHooks(hpath, phase, env)
990

    
991
  # iallocator -----------------
992

    
993
  @staticmethod
994
  def perspective_iallocator_runner(params):
995
    """Run an iallocator script.
996

997
    """
998
    name, idata, ial_params_dict = params
999
    ial_params = []
1000
    for ial_param in ial_params_dict.items():
1001
      ial_params.append("--" + ial_param[0] + "=" + ial_param[1])
1002
    iar = backend.IAllocatorRunner()
1003
    return iar.Run(name, idata, ial_params)
1004

    
1005
  # test -----------------------
1006

    
1007
  @staticmethod
1008
  def perspective_test_delay(params):
1009
    """Run test delay.
1010

1011
    """
1012
    duration = params[0]
1013
    status, rval = utils.TestDelay(duration)
1014
    if not status:
1015
      raise backend.RPCFail(rval)
1016
    return rval
1017

    
1018
  # file storage ---------------
1019

    
1020
  @staticmethod
1021
  def perspective_file_storage_dir_create(params):
1022
    """Create the file storage directory.
1023

1024
    """
1025
    file_storage_dir = params[0]
1026
    return backend.CreateFileStorageDir(file_storage_dir)
1027

    
1028
  @staticmethod
1029
  def perspective_file_storage_dir_remove(params):
1030
    """Remove the file storage directory.
1031

1032
    """
1033
    file_storage_dir = params[0]
1034
    return backend.RemoveFileStorageDir(file_storage_dir)
1035

    
1036
  @staticmethod
1037
  def perspective_file_storage_dir_rename(params):
1038
    """Rename the file storage directory.
1039

1040
    """
1041
    old_file_storage_dir = params[0]
1042
    new_file_storage_dir = params[1]
1043
    return backend.RenameFileStorageDir(old_file_storage_dir,
1044
                                        new_file_storage_dir)
1045

    
1046
  # jobs ------------------------
1047

    
1048
  @staticmethod
1049
  @_RequireJobQueueLock
1050
  def perspective_jobqueue_update(params):
1051
    """Update job queue.
1052

1053
    """
1054
    (file_name, content) = params
1055
    return backend.JobQueueUpdate(file_name, content)
1056

    
1057
  @staticmethod
1058
  @_RequireJobQueueLock
1059
  def perspective_jobqueue_purge(params):
1060
    """Purge job queue.
1061

1062
    """
1063
    return backend.JobQueuePurge()
1064

    
1065
  @staticmethod
1066
  @_RequireJobQueueLock
1067
  def perspective_jobqueue_rename(params):
1068
    """Rename a job queue file.
1069

1070
    """
1071
    # TODO: What if a file fails to rename?
1072
    return [backend.JobQueueRename(old, new) for old, new in params[0]]
1073

    
1074
  @staticmethod
1075
  @_RequireJobQueueLock
1076
  def perspective_jobqueue_set_drain_flag(params):
1077
    """Set job queue's drain flag.
1078

1079
    """
1080
    (flag, ) = params
1081

    
1082
    return jstore.SetDrainFlag(flag)
1083

    
1084
  # hypervisor ---------------
1085

    
1086
  @staticmethod
1087
  def perspective_hypervisor_validate_params(params):
1088
    """Validate the hypervisor parameters.
1089

1090
    """
1091
    (hvname, hvparams) = params
1092
    return backend.ValidateHVParams(hvname, hvparams)
1093

    
1094
  # Crypto
1095

    
1096
  @staticmethod
1097
  def perspective_x509_cert_create(params):
1098
    """Creates a new X509 certificate for SSL/TLS.
1099

1100
    """
1101
    (validity, ) = params
1102
    return backend.CreateX509Certificate(validity)
1103

    
1104
  @staticmethod
1105
  def perspective_x509_cert_remove(params):
1106
    """Removes a X509 certificate.
1107

1108
    """
1109
    (name, ) = params
1110
    return backend.RemoveX509Certificate(name)
1111

    
1112
  # Import and export
1113

    
1114
  @staticmethod
1115
  def perspective_import_start(params):
1116
    """Starts an import daemon.
1117

1118
    """
1119
    (opts_s, instance, component, (dest, dest_args)) = params
1120

    
1121
    opts = objects.ImportExportOptions.FromDict(opts_s)
1122

    
1123
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1124
                                           None, None,
1125
                                           objects.Instance.FromDict(instance),
1126
                                           component, dest,
1127
                                           _DecodeImportExportIO(dest,
1128
                                                                 dest_args))
1129

    
1130
  @staticmethod
1131
  def perspective_export_start(params):
1132
    """Starts an export daemon.
1133

1134
    """
1135
    (opts_s, host, port, instance, component, (source, source_args)) = params
1136

    
1137
    opts = objects.ImportExportOptions.FromDict(opts_s)
1138

    
1139
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1140
                                           host, port,
1141
                                           objects.Instance.FromDict(instance),
1142
                                           component, source,
1143
                                           _DecodeImportExportIO(source,
1144
                                                                 source_args))
1145

    
1146
  @staticmethod
1147
  def perspective_impexp_status(params):
1148
    """Retrieves the status of an import or export daemon.
1149

1150
    """
1151
    return backend.GetImportExportStatus(params[0])
1152

    
1153
  @staticmethod
1154
  def perspective_impexp_abort(params):
1155
    """Aborts an import or export.
1156

1157
    """
1158
    return backend.AbortImportExport(params[0])
1159

    
1160
  @staticmethod
1161
  def perspective_impexp_cleanup(params):
1162
    """Cleans up after an import or export.
1163

1164
    """
1165
    return backend.CleanupImportExport(params[0])
1166

    
1167

    
1168
def CheckNoded(_, args):
1169
  """Initial checks whether to run or exit with a failure.
1170

1171
  """
1172
  if args: # noded doesn't take any arguments
1173
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1174
                          sys.argv[0])
1175
    sys.exit(constants.EXIT_FAILURE)
1176
  try:
1177
    codecs.lookup("string-escape")
1178
  except LookupError:
1179
    print >> sys.stderr, ("Can't load the string-escape code which is part"
1180
                          " of the Python installation. Is your installation"
1181
                          " complete/correct? Aborting.")
1182
    sys.exit(constants.EXIT_FAILURE)
1183

    
1184

    
1185
def SSLVerifyPeer(conn, cert, errnum, errdepth, ok):
1186
  """Callback function to verify a peer against the candidate cert map.
1187

1188
  Note that we have a chicken-and-egg problem during cluster init and upgrade.
1189
  This method checks whether the incoming connection comes from a master
1190
  candidate by comparing it to the master certificate map in the cluster
1191
  configuration. However, during cluster init and cluster upgrade there
1192
  are various RPC calls done to the master node itself, before the candidate
1193
  certificate list is established and the cluster configuration is written.
1194
  In this case, we cannot check against the master candidate map.
1195

1196
  This problem is solved by checking whether the candidate map is empty. An
1197
  initialized 2.11 or higher cluster has at least one entry for the master
1198
  node in the candidate map. If the map is empty, we know that we are still
1199
  in the bootstrap/upgrade phase. In this case, we read the server certificate
1200
  digest and compare it to the incoming request.
1201

1202
  This means that after an upgrade of Ganeti, the system continues to operate
1203
  like before, using server certificates only. After the client certificates
1204
  are generated with ``gnt-cluster renew-crypto --new-node-certificates``,
1205
  RPC communication is switched to using client certificates and the trick of
1206
  using server certificates does not work anymore.
1207

1208
  @type conn: C{OpenSSL.SSL.Connection}
1209
  @param conn: the OpenSSL connection object
1210
  @type cert: C{OpenSSL.X509}
1211
  @param cert: the peer's SSL certificate
1212

1213
  """
1214
  # some parameters are unused, but this is the API
1215
  # pylint: disable=W0613
1216
  _BOOTSTRAP = "bootstrap"
1217
  sstore = ssconf.SimpleStore()
1218
  try:
1219
    candidate_certs = sstore.GetMasterCandidatesCertMap()
1220
  except errors.ConfigurationError:
1221
    logging.info("No candidate certificates found. Switching to "
1222
                 "bootstrap/update mode.")
1223
    candidate_certs = None
1224
  if not candidate_certs:
1225
    candidate_certs = {
1226
      _BOOTSTRAP: utils.GetCertificateDigest(
1227
        cert_filename=pathutils.NODED_CERT_FILE)}
1228
  return cert.digest("sha1") in candidate_certs.values()
1229
  # pylint: enable=W0613
1230

    
1231

    
1232
def PrepNoded(options, _):
1233
  """Preparation node daemon function, executed with the PID file held.
1234

1235
  """
1236
  if options.mlock:
1237
    request_executor_class = MlockallRequestExecutor
1238
    try:
1239
      utils.Mlockall()
1240
    except errors.NoCtypesError:
1241
      logging.warning("Cannot set memory lock, ctypes module not found")
1242
      request_executor_class = http.server.HttpServerRequestExecutor
1243
  else:
1244
    request_executor_class = http.server.HttpServerRequestExecutor
1245

    
1246
  # Read SSL certificate
1247
  if options.ssl:
1248
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1249
                                    ssl_cert_path=options.ssl_cert)
1250
  else:
1251
    ssl_params = None
1252

    
1253
  err = _PrepareQueueLock()
1254
  if err is not None:
1255
    # this might be some kind of file-system/permission error; while
1256
    # this breaks the job queue functionality, we shouldn't prevent
1257
    # startup of the whole node daemon because of this
1258
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1259

    
1260
  handler = NodeRequestHandler()
1261

    
1262
  mainloop = daemon.Mainloop()
1263
  server = \
1264
    http.server.HttpServer(mainloop, options.bind_address, options.port,
1265
                           handler, ssl_params=ssl_params, ssl_verify_peer=True,
1266
                           request_executor_class=request_executor_class,
1267
                           ssl_verify_callback=SSLVerifyPeer)
1268
  server.Start()
1269

    
1270
  return (mainloop, server)
1271

    
1272

    
1273
def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1274
  """Main node daemon function, executed with the PID file held.
1275

1276
  """
1277
  (mainloop, server) = prep_data
1278
  try:
1279
    mainloop.Run()
1280
  finally:
1281
    server.Stop()
1282

    
1283

    
1284
def Main():
1285
  """Main function for the node daemon.
1286

1287
  """
1288
  parser = OptionParser(description="Ganeti node daemon",
1289
                        usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1290
                               " [-i INTERFACE]"),
1291
                        version="%%prog (ganeti) %s" %
1292
                        constants.RELEASE_VERSION)
1293
  parser.add_option("--no-mlock", dest="mlock",
1294
                    help="Do not mlock the node memory in ram",
1295
                    default=True, action="store_false")
1296

    
1297
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1298
                     default_ssl_cert=pathutils.NODED_CERT_FILE,
1299
                     default_ssl_key=pathutils.NODED_CERT_FILE,
1300
                     console_logging=True)