Statistics
| Branch: | Tag: | Revision:

root / lib / server / noded.py @ a9f33339

History | View | Annotate | Download (33.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36
import codecs
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti.storage import container
49
from ganeti import serializer
50
from ganeti import netutils
51
from ganeti import pathutils
52
from ganeti import ssconf
53

    
54
import ganeti.http.server # pylint: disable=W0611
55

    
56

    
57
queue_lock = None
58

    
59

    
60
def _extendReasonTrail(trail, source, reason=""):
61
  """Extend the reason trail with noded information
62

63
  The trail is extended by appending the name of the noded functionality
64
  """
65
  assert trail is not None
66
  trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
67
  trail.append((trail_source, reason, utils.EpochNano()))
68

    
69

    
70
def _PrepareQueueLock():
71
  """Try to prepare the queue lock.
72

73
  @return: None for success, otherwise an exception object
74

75
  """
76
  global queue_lock # pylint: disable=W0603
77

    
78
  if queue_lock is not None:
79
    return None
80

    
81
  # Prepare job queue
82
  try:
83
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
84
    return None
85
  except EnvironmentError, err:
86
    return err
87

    
88

    
89
def _RequireJobQueueLock(fn):
90
  """Decorator for job queue manipulating functions.
91

92
  """
93
  QUEUE_LOCK_TIMEOUT = 10
94

    
95
  def wrapper(*args, **kwargs):
96
    # Locking in exclusive, blocking mode because there could be several
97
    # children running at the same time. Waiting up to 10 seconds.
98
    if _PrepareQueueLock() is not None:
99
      raise errors.JobQueueError("Job queue failed initialization,"
100
                                 " cannot update jobs")
101
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
102
    try:
103
      return fn(*args, **kwargs)
104
    finally:
105
      queue_lock.Unlock()
106

    
107
  return wrapper
108

    
109

    
110
def _DecodeImportExportIO(ieio, ieioargs):
111
  """Decodes import/export I/O information.
112

113
  """
114
  if ieio == constants.IEIO_RAW_DISK:
115
    assert len(ieioargs) == 1
116
    return (objects.Disk.FromDict(ieioargs[0]), )
117

    
118
  if ieio == constants.IEIO_SCRIPT:
119
    assert len(ieioargs) == 2
120
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
121

    
122
  return ieioargs
123

    
124

    
125
def _DefaultAlternative(value, default):
126
  """Returns value or, if evaluating to False, a default value.
127

128
  Returns the given value, unless it evaluates to False. In the latter case the
129
  default value is returned.
130

131
  @param value: Value to return if it doesn't evaluate to False
132
  @param default: Default value
133
  @return: Given value or the default
134

135
  """
136
  if value:
137
    return value
138

    
139
  return default
140

    
141

    
142
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
143
  """Subclass ensuring request handlers are locked in RAM.
144

145
  """
146
  def __init__(self, *args, **kwargs):
147
    utils.Mlockall()
148

    
149
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
150

    
151

    
152
class NodeRequestHandler(http.server.HttpServerHandler):
153
  """The server implementation.
154

155
  This class holds all methods exposed over the RPC interface.
156

157
  """
158
  # too many public methods, and unused args - all methods get params
159
  # due to the API
160
  # pylint: disable=R0904,W0613
161
  def __init__(self):
162
    http.server.HttpServerHandler.__init__(self)
163
    self.noded_pid = os.getpid()
164

    
165
  def HandleRequest(self, req):
166
    """Handle a request.
167

168
    """
169

    
170
    if req.request_method.upper() != http.HTTP_POST:
171
      raise http.HttpBadRequest("Only the POST method is supported")
172

    
173
    path = req.request_path
174
    if path.startswith("/"):
175
      path = path[1:]
176

    
177
    method = getattr(self, "perspective_%s" % path, None)
178
    if method is None:
179
      raise http.HttpNotFound()
180

    
181
    try:
182
      result = (True, method(serializer.LoadJson(req.request_body)))
183

    
184
    except backend.RPCFail, err:
185
      # our custom failure exception; str(err) works fine if the
186
      # exception was constructed with a single argument, and in
187
      # this case, err.message == err.args[0] == str(err)
188
      result = (False, str(err))
189
    except errors.QuitGanetiException, err:
190
      # Tell parent to quit
191
      logging.info("Shutting down the node daemon, arguments: %s",
192
                   str(err.args))
193
      os.kill(self.noded_pid, signal.SIGTERM)
194
      # And return the error's arguments, which must be already in
195
      # correct tuple format
196
      result = err.args
197
    except Exception, err:
198
      logging.exception("Error in RPC call")
199
      result = (False, "Error while executing backend function: %s" % str(err))
200

    
201
    return serializer.DumpJson(result)
202

    
203
  # the new block devices  --------------------------
204

    
205
  @staticmethod
206
  def perspective_blockdev_create(params):
207
    """Create a block device.
208

209
    """
210
    (bdev_s, size, owner, on_primary, info, excl_stor) = params
211
    bdev = objects.Disk.FromDict(bdev_s)
212
    if bdev is None:
213
      raise ValueError("can't unserialize data!")
214
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
215
                                  excl_stor)
216

    
217
  @staticmethod
218
  def perspective_blockdev_pause_resume_sync(params):
219
    """Pause/resume sync of a block device.
220

221
    """
222
    disks_s, pause = params
223
    disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
224
    return backend.BlockdevPauseResumeSync(disks, pause)
225

    
226
  @staticmethod
227
  def perspective_blockdev_wipe(params):
228
    """Wipe a block device.
229

230
    """
231
    bdev_s, offset, size = params
232
    bdev = objects.Disk.FromDict(bdev_s)
233
    return backend.BlockdevWipe(bdev, offset, size)
234

    
235
  @staticmethod
236
  def perspective_blockdev_remove(params):
237
    """Remove a block device.
238

239
    """
240
    bdev_s = params[0]
241
    bdev = objects.Disk.FromDict(bdev_s)
242
    return backend.BlockdevRemove(bdev)
243

    
244
  @staticmethod
245
  def perspective_blockdev_rename(params):
246
    """Remove a block device.
247

248
    """
249
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
250
    return backend.BlockdevRename(devlist)
251

    
252
  @staticmethod
253
  def perspective_blockdev_assemble(params):
254
    """Assemble a block device.
255

256
    """
257
    bdev_s, owner, on_primary, idx = params
258
    bdev = objects.Disk.FromDict(bdev_s)
259
    if bdev is None:
260
      raise ValueError("can't unserialize data!")
261
    return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
262

    
263
  @staticmethod
264
  def perspective_blockdev_shutdown(params):
265
    """Shutdown a block device.
266

267
    """
268
    bdev_s = params[0]
269
    bdev = objects.Disk.FromDict(bdev_s)
270
    if bdev is None:
271
      raise ValueError("can't unserialize data!")
272
    return backend.BlockdevShutdown(bdev)
273

    
274
  @staticmethod
275
  def perspective_blockdev_addchildren(params):
276
    """Add a child to a mirror device.
277

278
    Note: this is only valid for mirror devices. It's the caller's duty
279
    to send a correct disk, otherwise we raise an error.
280

281
    """
282
    bdev_s, ndev_s = params
283
    bdev = objects.Disk.FromDict(bdev_s)
284
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
285
    if bdev is None or ndevs.count(None) > 0:
286
      raise ValueError("can't unserialize data!")
287
    return backend.BlockdevAddchildren(bdev, ndevs)
288

    
289
  @staticmethod
290
  def perspective_blockdev_removechildren(params):
291
    """Remove a child from a mirror device.
292

293
    This is only valid for mirror devices, of course. It's the callers
294
    duty to send a correct disk, otherwise we raise an error.
295

296
    """
297
    bdev_s, ndev_s = params
298
    bdev = objects.Disk.FromDict(bdev_s)
299
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
300
    if bdev is None or ndevs.count(None) > 0:
301
      raise ValueError("can't unserialize data!")
302
    return backend.BlockdevRemovechildren(bdev, ndevs)
303

    
304
  @staticmethod
305
  def perspective_blockdev_getmirrorstatus(params):
306
    """Return the mirror status for a list of disks.
307

308
    """
309
    disks = [objects.Disk.FromDict(dsk_s)
310
             for dsk_s in params[0]]
311
    return [status.ToDict()
312
            for status in backend.BlockdevGetmirrorstatus(disks)]
313

    
314
  @staticmethod
315
  def perspective_blockdev_getmirrorstatus_multi(params):
316
    """Return the mirror status for a list of disks.
317

318
    """
319
    (node_disks, ) = params
320

    
321
    disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
322

    
323
    result = []
324

    
325
    for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
326
      if success:
327
        result.append((success, status.ToDict()))
328
      else:
329
        result.append((success, status))
330

    
331
    return result
332

    
333
  @staticmethod
334
  def perspective_blockdev_find(params):
335
    """Expose the FindBlockDevice functionality for a disk.
336

337
    This will try to find but not activate a disk.
338

339
    """
340
    disk = objects.Disk.FromDict(params[0])
341

    
342
    result = backend.BlockdevFind(disk)
343
    if result is None:
344
      return None
345

    
346
    return result.ToDict()
347

    
348
  @staticmethod
349
  def perspective_blockdev_snapshot(params):
350
    """Create a snapshot device.
351

352
    Note that this is only valid for LVM disks, if we get passed
353
    something else we raise an exception. The snapshot device can be
354
    remove by calling the generic block device remove call.
355

356
    """
357
    cfbd = objects.Disk.FromDict(params[0])
358
    return backend.BlockdevSnapshot(cfbd)
359

    
360
  @staticmethod
361
  def perspective_blockdev_grow(params):
362
    """Grow a stack of devices.
363

364
    """
365
    if len(params) < 5:
366
      raise ValueError("Received only %s parameters in blockdev_grow,"
367
                       " old master?" % len(params))
368
    cfbd = objects.Disk.FromDict(params[0])
369
    amount = params[1]
370
    dryrun = params[2]
371
    backingstore = params[3]
372
    excl_stor = params[4]
373
    return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore, excl_stor)
374

    
375
  @staticmethod
376
  def perspective_blockdev_close(params):
377
    """Closes the given block devices.
378

379
    """
380
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
381
    return backend.BlockdevClose(params[0], disks)
382

    
383
  @staticmethod
384
  def perspective_blockdev_getdimensions(params):
385
    """Compute the sizes of the given block devices.
386

387
    """
388
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
389
    return backend.BlockdevGetdimensions(disks)
390

    
391
  @staticmethod
392
  def perspective_blockdev_setinfo(params):
393
    """Sets metadata information on the given block device.
394

395
    """
396
    (disk, info) = params
397
    disk = objects.Disk.FromDict(disk)
398
    return backend.BlockdevSetInfo(disk, info)
399

    
400
  # blockdev/drbd specific methods ----------
401

    
402
  @staticmethod
403
  def perspective_drbd_disconnect_net(params):
404
    """Disconnects the network connection of drbd disks.
405

406
    Note that this is only valid for drbd disks, so the members of the
407
    disk list must all be drbd devices.
408

409
    """
410
    (disks,) = params
411
    disks = [objects.Disk.FromDict(disk) for disk in disks]
412
    return backend.DrbdDisconnectNet(disks)
413

    
414
  @staticmethod
415
  def perspective_drbd_attach_net(params):
416
    """Attaches the network connection of drbd disks.
417

418
    Note that this is only valid for drbd disks, so the members of the
419
    disk list must all be drbd devices.
420

421
    """
422
    disks, instance_name, multimaster = params
423
    disks = [objects.Disk.FromDict(disk) for disk in disks]
424
    return backend.DrbdAttachNet(disks, instance_name, multimaster)
425

    
426
  @staticmethod
427
  def perspective_drbd_wait_sync(params):
428
    """Wait until DRBD disks are synched.
429

430
    Note that this is only valid for drbd disks, so the members of the
431
    disk list must all be drbd devices.
432

433
    """
434
    (disks,) = params
435
    disks = [objects.Disk.FromDict(disk) for disk in disks]
436
    return backend.DrbdWaitSync(disks)
437

    
438
  @staticmethod
439
  def perspective_drbd_needs_activation(params):
440
    """Checks if the drbd devices need activation
441

442
    Note that this is only valid for drbd disks, so the members of the
443
    disk list must all be drbd devices.
444

445
    """
446
    (disks,) = params
447
    disks = [objects.Disk.FromDict(disk) for disk in disks]
448
    return backend.DrbdNeedsActivation(disks)
449

    
450
  @staticmethod
451
  def perspective_drbd_helper(_):
452
    """Query drbd helper.
453

454
    """
455
    return backend.GetDrbdUsermodeHelper()
456

    
457
  # export/import  --------------------------
458

    
459
  @staticmethod
460
  def perspective_finalize_export(params):
461
    """Expose the finalize export functionality.
462

463
    """
464
    instance = objects.Instance.FromDict(params[0])
465

    
466
    snap_disks = []
467
    for disk in params[1]:
468
      if isinstance(disk, bool):
469
        snap_disks.append(disk)
470
      else:
471
        snap_disks.append(objects.Disk.FromDict(disk))
472

    
473
    return backend.FinalizeExport(instance, snap_disks)
474

    
475
  @staticmethod
476
  def perspective_export_info(params):
477
    """Query information about an existing export on this node.
478

479
    The given path may not contain an export, in which case we return
480
    None.
481

482
    """
483
    path = params[0]
484
    return backend.ExportInfo(path)
485

    
486
  @staticmethod
487
  def perspective_export_list(params):
488
    """List the available exports on this node.
489

490
    Note that as opposed to export_info, which may query data about an
491
    export in any path, this only queries the standard Ganeti path
492
    (pathutils.EXPORT_DIR).
493

494
    """
495
    return backend.ListExports()
496

    
497
  @staticmethod
498
  def perspective_export_remove(params):
499
    """Remove an export.
500

501
    """
502
    export = params[0]
503
    return backend.RemoveExport(export)
504

    
505
  # block device ---------------------
506
  @staticmethod
507
  def perspective_bdev_sizes(params):
508
    """Query the list of block devices
509

510
    """
511
    devices = params[0]
512
    return backend.GetBlockDevSizes(devices)
513

    
514
  # volume  --------------------------
515

    
516
  @staticmethod
517
  def perspective_lv_list(params):
518
    """Query the list of logical volumes in a given volume group.
519

520
    """
521
    vgname = params[0]
522
    return backend.GetVolumeList(vgname)
523

    
524
  @staticmethod
525
  def perspective_vg_list(params):
526
    """Query the list of volume groups.
527

528
    """
529
    return backend.ListVolumeGroups()
530

    
531
  # Storage --------------------------
532

    
533
  @staticmethod
534
  def perspective_storage_list(params):
535
    """Get list of storage units.
536

537
    """
538
    (su_name, su_args, name, fields) = params
539
    return container.GetStorage(su_name, *su_args).List(name, fields)
540

    
541
  @staticmethod
542
  def perspective_storage_modify(params):
543
    """Modify a storage unit.
544

545
    """
546
    (su_name, su_args, name, changes) = params
547
    return container.GetStorage(su_name, *su_args).Modify(name, changes)
548

    
549
  @staticmethod
550
  def perspective_storage_execute(params):
551
    """Execute an operation on a storage unit.
552

553
    """
554
    (su_name, su_args, name, op) = params
555
    return container.GetStorage(su_name, *su_args).Execute(name, op)
556

    
557
  # bridge  --------------------------
558

    
559
  @staticmethod
560
  def perspective_bridges_exist(params):
561
    """Check if all bridges given exist on this node.
562

563
    """
564
    bridges_list = params[0]
565
    return backend.BridgesExist(bridges_list)
566

    
567
  # instance  --------------------------
568

    
569
  @staticmethod
570
  def perspective_instance_os_add(params):
571
    """Install an OS on a given instance.
572

573
    """
574
    inst_s = params[0]
575
    inst = objects.Instance.FromDict(inst_s)
576
    reinstall = params[1]
577
    debug = params[2]
578
    return backend.InstanceOsAdd(inst, reinstall, debug)
579

    
580
  @staticmethod
581
  def perspective_instance_run_rename(params):
582
    """Runs the OS rename script for an instance.
583

584
    """
585
    inst_s, old_name, debug = params
586
    inst = objects.Instance.FromDict(inst_s)
587
    return backend.RunRenameInstance(inst, old_name, debug)
588

    
589
  @staticmethod
590
  def perspective_instance_shutdown(params):
591
    """Shutdown an instance.
592

593
    """
594
    instance = objects.Instance.FromDict(params[0])
595
    timeout = params[1]
596
    trail = params[2]
597
    _extendReasonTrail(trail, "shutdown")
598
    return backend.InstanceShutdown(instance, timeout, trail)
599

    
600
  @staticmethod
601
  def perspective_instance_start(params):
602
    """Start an instance.
603

604
    """
605
    (instance_name, startup_paused, trail) = params
606
    instance = objects.Instance.FromDict(instance_name)
607
    _extendReasonTrail(trail, "start")
608
    return backend.StartInstance(instance, startup_paused, trail)
609

    
610
  @staticmethod
611
  def perspective_hotplug_device(params):
612
    """Hotplugs device to a running instance.
613

614
    """
615
    (idict, action, dev_type, ddict, extra, seq) = params
616
    instance = objects.Instance.FromDict(idict)
617
    if dev_type == constants.HOTPLUG_TARGET_DISK:
618
      device = objects.Disk.FromDict(ddict)
619
    elif dev_type == constants.HOTPLUG_TARGET_NIC:
620
      device = objects.NIC.FromDict(ddict)
621
    else:
622
      assert dev_type in constants.HOTPLUG_ALL_TARGETS
623
    return backend.HotplugDevice(instance, action, dev_type, device, extra, seq)
624

    
625
  @staticmethod
626
  def perspective_hotplug_supported(params):
627
    """Checks if hotplug is supported.
628

629
    """
630
    instance = objects.Instance.FromDict(params[0])
631
    return backend.HotplugSupported(instance)
632

    
633
  @staticmethod
634
  def perspective_migration_info(params):
635
    """Gather information about an instance to be migrated.
636

637
    """
638
    instance = objects.Instance.FromDict(params[0])
639
    return backend.MigrationInfo(instance)
640

    
641
  @staticmethod
642
  def perspective_accept_instance(params):
643
    """Prepare the node to accept an instance.
644

645
    """
646
    instance, info, target = params
647
    instance = objects.Instance.FromDict(instance)
648
    return backend.AcceptInstance(instance, info, target)
649

    
650
  @staticmethod
651
  def perspective_instance_finalize_migration_dst(params):
652
    """Finalize the instance migration on the destination node.
653

654
    """
655
    instance, info, success = params
656
    instance = objects.Instance.FromDict(instance)
657
    return backend.FinalizeMigrationDst(instance, info, success)
658

    
659
  @staticmethod
660
  def perspective_instance_migrate(params):
661
    """Migrates an instance.
662

663
    """
664
    cluster_name, instance, target, live = params
665
    instance = objects.Instance.FromDict(instance)
666
    return backend.MigrateInstance(cluster_name, instance, target, live)
667

    
668
  @staticmethod
669
  def perspective_instance_finalize_migration_src(params):
670
    """Finalize the instance migration on the source node.
671

672
    """
673
    instance, success, live = params
674
    instance = objects.Instance.FromDict(instance)
675
    return backend.FinalizeMigrationSource(instance, success, live)
676

    
677
  @staticmethod
678
  def perspective_instance_get_migration_status(params):
679
    """Reports migration status.
680

681
    """
682
    instance = objects.Instance.FromDict(params[0])
683
    return backend.GetMigrationStatus(instance).ToDict()
684

    
685
  @staticmethod
686
  def perspective_instance_reboot(params):
687
    """Reboot an instance.
688

689
    """
690
    instance = objects.Instance.FromDict(params[0])
691
    reboot_type = params[1]
692
    shutdown_timeout = params[2]
693
    trail = params[3]
694
    _extendReasonTrail(trail, "reboot")
695
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
696
                                  trail)
697

    
698
  @staticmethod
699
  def perspective_instance_balloon_memory(params):
700
    """Modify instance runtime memory.
701

702
    """
703
    instance_dict, memory = params
704
    instance = objects.Instance.FromDict(instance_dict)
705
    return backend.InstanceBalloonMemory(instance, memory)
706

    
707
  @staticmethod
708
  def perspective_instance_info(params):
709
    """Query instance information.
710

711
    """
712
    (instance_name, hypervisor_name, hvparams) = params
713
    return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
714

    
715
  @staticmethod
716
  def perspective_instance_migratable(params):
717
    """Query whether the specified instance can be migrated.
718

719
    """
720
    instance = objects.Instance.FromDict(params[0])
721
    return backend.GetInstanceMigratable(instance)
722

    
723
  @staticmethod
724
  def perspective_all_instances_info(params):
725
    """Query information about all instances.
726

727
    """
728
    (hypervisor_list, all_hvparams) = params
729
    return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
730

    
731
  @staticmethod
732
  def perspective_instance_console_info(params):
733
    """Query information on how to get console access to instances
734

735
    """
736
    return backend.GetInstanceConsoleInfo(params)
737

    
738
  @staticmethod
739
  def perspective_instance_list(params):
740
    """Query the list of running instances.
741

742
    """
743
    (hypervisor_list, hvparams) = params
744
    return backend.GetInstanceList(hypervisor_list, hvparams)
745

    
746
  # node --------------------------
747

    
748
  @staticmethod
749
  def perspective_node_has_ip_address(params):
750
    """Checks if a node has the given ip address.
751

752
    """
753
    return netutils.IPAddress.Own(params[0])
754

    
755
  @staticmethod
756
  def perspective_node_info(params):
757
    """Query node information.
758

759
    """
760
    (storage_units, hv_specs) = params
761
    return backend.GetNodeInfo(storage_units, hv_specs)
762

    
763
  @staticmethod
764
  def perspective_etc_hosts_modify(params):
765
    """Modify a node entry in /etc/hosts.
766

767
    """
768
    backend.EtcHostsModify(params[0], params[1], params[2])
769

    
770
    return True
771

    
772
  @staticmethod
773
  def perspective_node_verify(params):
774
    """Run a verify sequence on this node.
775

776
    """
777
    (what, cluster_name, hvparams, node_groups, groups_cfg) = params
778
    return backend.VerifyNode(what, cluster_name, hvparams,
779
                              node_groups, groups_cfg)
780

    
781
  @classmethod
782
  def perspective_node_verify_light(cls, params):
783
    """Run a light verify sequence on this node.
784

785
    This call is meant to perform a less strict verification of the node in
786
    certain situations. Right now, it is invoked only when a node is just about
787
    to be added to a cluster, and even then, it performs the same checks as
788
    L{perspective_node_verify}.
789
    """
790
    return cls.perspective_node_verify(params)
791

    
792
  @staticmethod
793
  def perspective_node_start_master_daemons(params):
794
    """Start the master daemons on this node.
795

796
    """
797
    return backend.StartMasterDaemons(params[0])
798

    
799
  @staticmethod
800
  def perspective_node_activate_master_ip(params):
801
    """Activate the master IP on this node.
802

803
    """
804
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
805
    return backend.ActivateMasterIp(master_params, params[1])
806

    
807
  @staticmethod
808
  def perspective_node_deactivate_master_ip(params):
809
    """Deactivate the master IP on this node.
810

811
    """
812
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
813
    return backend.DeactivateMasterIp(master_params, params[1])
814

    
815
  @staticmethod
816
  def perspective_node_stop_master(params):
817
    """Stops master daemons on this node.
818

819
    """
820
    return backend.StopMasterDaemons()
821

    
822
  @staticmethod
823
  def perspective_node_change_master_netmask(params):
824
    """Change the master IP netmask.
825

826
    """
827
    return backend.ChangeMasterNetmask(params[0], params[1], params[2],
828
                                       params[3])
829

    
830
  @staticmethod
831
  def perspective_node_leave_cluster(params):
832
    """Cleanup after leaving a cluster.
833

834
    """
835
    return backend.LeaveCluster(params[0])
836

    
837
  @staticmethod
838
  def perspective_node_volumes(params):
839
    """Query the list of all logical volume groups.
840

841
    """
842
    return backend.NodeVolumes()
843

    
844
  @staticmethod
845
  def perspective_node_demote_from_mc(params):
846
    """Demote a node from the master candidate role.
847

848
    """
849
    return backend.DemoteFromMC()
850

    
851
  @staticmethod
852
  def perspective_node_powercycle(params):
853
    """Tries to powercycle the node.
854

855
    """
856
    (hypervisor_type, hvparams) = params
857
    return backend.PowercycleNode(hypervisor_type, hvparams)
858

    
859
  @staticmethod
860
  def perspective_node_configure_ovs(params):
861
    """Sets up OpenvSwitch on the node.
862

863
    """
864
    (ovs_name, ovs_link) = params
865
    return backend.ConfigureOVS(ovs_name, ovs_link)
866

    
867
  # cluster --------------------------
868

    
869
  @staticmethod
870
  def perspective_version(params):
871
    """Query version information.
872

873
    """
874
    return constants.PROTOCOL_VERSION
875

    
876
  @staticmethod
877
  def perspective_upload_file(params):
878
    """Upload a file.
879

880
    Note that the backend implementation imposes strict rules on which
881
    files are accepted.
882

883
    """
884
    return backend.UploadFile(*(params[0]))
885

    
886
  @staticmethod
887
  def perspective_master_info(params):
888
    """Query master information.
889

890
    """
891
    return backend.GetMasterInfo()
892

    
893
  @staticmethod
894
  def perspective_run_oob(params):
895
    """Runs oob on node.
896

897
    """
898
    output = backend.RunOob(params[0], params[1], params[2], params[3])
899
    if output:
900
      result = serializer.LoadJson(output)
901
    else:
902
      result = None
903
    return result
904

    
905
  @staticmethod
906
  def perspective_restricted_command(params):
907
    """Runs a restricted command.
908

909
    """
910
    (cmd, ) = params
911

    
912
    return backend.RunRestrictedCmd(cmd)
913

    
914
  @staticmethod
915
  def perspective_write_ssconf_files(params):
916
    """Write ssconf files.
917

918
    """
919
    (values,) = params
920
    return ssconf.WriteSsconfFiles(values)
921

    
922
  @staticmethod
923
  def perspective_get_watcher_pause(params):
924
    """Get watcher pause end.
925

926
    """
927
    return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
928

    
929
  @staticmethod
930
  def perspective_set_watcher_pause(params):
931
    """Set watcher pause.
932

933
    """
934
    (until, ) = params
935
    return backend.SetWatcherPause(until)
936

    
937
  # os -----------------------
938

    
939
  @staticmethod
940
  def perspective_os_diagnose(params):
941
    """Query detailed information about existing OSes.
942

943
    """
944
    return backend.DiagnoseOS()
945

    
946
  @staticmethod
947
  def perspective_os_get(params):
948
    """Query information about a given OS.
949

950
    """
951
    name = params[0]
952
    os_obj = backend.OSFromDisk(name)
953
    return os_obj.ToDict()
954

    
955
  @staticmethod
956
  def perspective_os_validate(params):
957
    """Run a given OS' validation routine.
958

959
    """
960
    required, name, checks, params = params
961
    return backend.ValidateOS(required, name, checks, params)
962

    
963
  # extstorage -----------------------
964

    
965
  @staticmethod
966
  def perspective_extstorage_diagnose(params):
967
    """Query detailed information about existing extstorage providers.
968

969
    """
970
    return backend.DiagnoseExtStorage()
971

    
972
  # hooks -----------------------
973

    
974
  @staticmethod
975
  def perspective_hooks_runner(params):
976
    """Run hook scripts.
977

978
    """
979
    hpath, phase, env = params
980
    hr = backend.HooksRunner()
981
    return hr.RunHooks(hpath, phase, env)
982

    
983
  # iallocator -----------------
984

    
985
  @staticmethod
986
  def perspective_iallocator_runner(params):
987
    """Run an iallocator script.
988

989
    """
990
    name, idata = params
991
    iar = backend.IAllocatorRunner()
992
    return iar.Run(name, idata)
993

    
994
  # test -----------------------
995

    
996
  @staticmethod
997
  def perspective_test_delay(params):
998
    """Run test delay.
999

1000
    """
1001
    duration = params[0]
1002
    status, rval = utils.TestDelay(duration)
1003
    if not status:
1004
      raise backend.RPCFail(rval)
1005
    return rval
1006

    
1007
  # file storage ---------------
1008

    
1009
  @staticmethod
1010
  def perspective_file_storage_dir_create(params):
1011
    """Create the file storage directory.
1012

1013
    """
1014
    file_storage_dir = params[0]
1015
    return backend.CreateFileStorageDir(file_storage_dir)
1016

    
1017
  @staticmethod
1018
  def perspective_file_storage_dir_remove(params):
1019
    """Remove the file storage directory.
1020

1021
    """
1022
    file_storage_dir = params[0]
1023
    return backend.RemoveFileStorageDir(file_storage_dir)
1024

    
1025
  @staticmethod
1026
  def perspective_file_storage_dir_rename(params):
1027
    """Rename the file storage directory.
1028

1029
    """
1030
    old_file_storage_dir = params[0]
1031
    new_file_storage_dir = params[1]
1032
    return backend.RenameFileStorageDir(old_file_storage_dir,
1033
                                        new_file_storage_dir)
1034

    
1035
  # jobs ------------------------
1036

    
1037
  @staticmethod
1038
  @_RequireJobQueueLock
1039
  def perspective_jobqueue_update(params):
1040
    """Update job queue.
1041

1042
    """
1043
    (file_name, content) = params
1044
    return backend.JobQueueUpdate(file_name, content)
1045

    
1046
  @staticmethod
1047
  @_RequireJobQueueLock
1048
  def perspective_jobqueue_purge(params):
1049
    """Purge job queue.
1050

1051
    """
1052
    return backend.JobQueuePurge()
1053

    
1054
  @staticmethod
1055
  @_RequireJobQueueLock
1056
  def perspective_jobqueue_rename(params):
1057
    """Rename a job queue file.
1058

1059
    """
1060
    # TODO: What if a file fails to rename?
1061
    return [backend.JobQueueRename(old, new) for old, new in params[0]]
1062

    
1063
  @staticmethod
1064
  @_RequireJobQueueLock
1065
  def perspective_jobqueue_set_drain_flag(params):
1066
    """Set job queue's drain flag.
1067

1068
    """
1069
    (flag, ) = params
1070

    
1071
    return jstore.SetDrainFlag(flag)
1072

    
1073
  # hypervisor ---------------
1074

    
1075
  @staticmethod
1076
  def perspective_hypervisor_validate_params(params):
1077
    """Validate the hypervisor parameters.
1078

1079
    """
1080
    (hvname, hvparams) = params
1081
    return backend.ValidateHVParams(hvname, hvparams)
1082

    
1083
  # Crypto
1084

    
1085
  @staticmethod
1086
  def perspective_x509_cert_create(params):
1087
    """Creates a new X509 certificate for SSL/TLS.
1088

1089
    """
1090
    (validity, ) = params
1091
    return backend.CreateX509Certificate(validity)
1092

    
1093
  @staticmethod
1094
  def perspective_x509_cert_remove(params):
1095
    """Removes a X509 certificate.
1096

1097
    """
1098
    (name, ) = params
1099
    return backend.RemoveX509Certificate(name)
1100

    
1101
  # Import and export
1102

    
1103
  @staticmethod
1104
  def perspective_import_start(params):
1105
    """Starts an import daemon.
1106

1107
    """
1108
    (opts_s, instance, component, (dest, dest_args)) = params
1109

    
1110
    opts = objects.ImportExportOptions.FromDict(opts_s)
1111

    
1112
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
1113
                                           None, None,
1114
                                           objects.Instance.FromDict(instance),
1115
                                           component, dest,
1116
                                           _DecodeImportExportIO(dest,
1117
                                                                 dest_args))
1118

    
1119
  @staticmethod
1120
  def perspective_export_start(params):
1121
    """Starts an export daemon.
1122

1123
    """
1124
    (opts_s, host, port, instance, component, (source, source_args)) = params
1125

    
1126
    opts = objects.ImportExportOptions.FromDict(opts_s)
1127

    
1128
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
1129
                                           host, port,
1130
                                           objects.Instance.FromDict(instance),
1131
                                           component, source,
1132
                                           _DecodeImportExportIO(source,
1133
                                                                 source_args))
1134

    
1135
  @staticmethod
1136
  def perspective_impexp_status(params):
1137
    """Retrieves the status of an import or export daemon.
1138

1139
    """
1140
    return backend.GetImportExportStatus(params[0])
1141

    
1142
  @staticmethod
1143
  def perspective_impexp_abort(params):
1144
    """Aborts an import or export.
1145

1146
    """
1147
    return backend.AbortImportExport(params[0])
1148

    
1149
  @staticmethod
1150
  def perspective_impexp_cleanup(params):
1151
    """Cleans up after an import or export.
1152

1153
    """
1154
    return backend.CleanupImportExport(params[0])
1155

    
1156

    
1157
def CheckNoded(_, args):
1158
  """Initial checks whether to run or exit with a failure.
1159

1160
  """
1161
  if args: # noded doesn't take any arguments
1162
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1163
                          sys.argv[0])
1164
    sys.exit(constants.EXIT_FAILURE)
1165
  try:
1166
    codecs.lookup("string-escape")
1167
  except LookupError:
1168
    print >> sys.stderr, ("Can't load the string-escape code which is part"
1169
                          " of the Python installation. Is your installation"
1170
                          " complete/correct? Aborting.")
1171
    sys.exit(constants.EXIT_FAILURE)
1172

    
1173

    
1174
def PrepNoded(options, _):
1175
  """Preparation node daemon function, executed with the PID file held.
1176

1177
  """
1178
  if options.mlock:
1179
    request_executor_class = MlockallRequestExecutor
1180
    try:
1181
      utils.Mlockall()
1182
    except errors.NoCtypesError:
1183
      logging.warning("Cannot set memory lock, ctypes module not found")
1184
      request_executor_class = http.server.HttpServerRequestExecutor
1185
  else:
1186
    request_executor_class = http.server.HttpServerRequestExecutor
1187

    
1188
  # Read SSL certificate
1189
  if options.ssl:
1190
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1191
                                    ssl_cert_path=options.ssl_cert)
1192
  else:
1193
    ssl_params = None
1194

    
1195
  err = _PrepareQueueLock()
1196
  if err is not None:
1197
    # this might be some kind of file-system/permission error; while
1198
    # this breaks the job queue functionality, we shouldn't prevent
1199
    # startup of the whole node daemon because of this
1200
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1201

    
1202
  handler = NodeRequestHandler()
1203

    
1204
  mainloop = daemon.Mainloop()
1205
  server = \
1206
    http.server.HttpServer(mainloop, options.bind_address, options.port,
1207
                           handler, ssl_params=ssl_params, ssl_verify_peer=True,
1208
                           request_executor_class=request_executor_class)
1209
  server.Start()
1210

    
1211
  return (mainloop, server)
1212

    
1213

    
1214
def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1215
  """Main node daemon function, executed with the PID file held.
1216

1217
  """
1218
  (mainloop, server) = prep_data
1219
  try:
1220
    mainloop.Run()
1221
  finally:
1222
    server.Stop()
1223

    
1224

    
1225
def Main():
1226
  """Main function for the node daemon.
1227

1228
  """
1229
  parser = OptionParser(description="Ganeti node daemon",
1230
                        usage=("%prog [-f] [-d] [-p port] [-b ADDRESS]"
1231
                               " [-i INTERFACE]"),
1232
                        version="%%prog (ganeti) %s" %
1233
                        constants.RELEASE_VERSION)
1234
  parser.add_option("--no-mlock", dest="mlock",
1235
                    help="Do not mlock the node memory in ram",
1236
                    default=True, action="store_false")
1237

    
1238
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1239
                     default_ssl_cert=pathutils.NODED_CERT_FILE,
1240
                     default_ssl_key=pathutils.NODED_CERT_FILE,
1241
                     console_logging=True)