Statistics
| Branch: | Tag: | Revision:

root / lib / server / noded.py @ 2be7273c

History | View | Annotate | Download (27.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable-msg=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36

    
37
from optparse import OptionParser
38

    
39
from ganeti import backend
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import jstore
44
from ganeti import daemon
45
from ganeti import http
46
from ganeti import utils
47
from ganeti import storage
48
from ganeti import serializer
49
from ganeti import netutils
50

    
51
import ganeti.http.server # pylint: disable-msg=W0611
52

    
53

    
54
queue_lock = None
55

    
56

    
57
def _PrepareQueueLock():
58
  """Try to prepare the queue lock.
59

60
  @return: None for success, otherwise an exception object
61

62
  """
63
  global queue_lock # pylint: disable-msg=W0603
64

    
65
  if queue_lock is not None:
66
    return None
67

    
68
  # Prepare job queue
69
  try:
70
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71
    return None
72
  except EnvironmentError, err:
73
    return err
74

    
75

    
76
def _RequireJobQueueLock(fn):
77
  """Decorator for job queue manipulating functions.
78

79
  """
80
  QUEUE_LOCK_TIMEOUT = 10
81

    
82
  def wrapper(*args, **kwargs):
83
    # Locking in exclusive, blocking mode because there could be several
84
    # children running at the same time. Waiting up to 10 seconds.
85
    if _PrepareQueueLock() is not None:
86
      raise errors.JobQueueError("Job queue failed initialization,"
87
                                 " cannot update jobs")
88
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89
    try:
90
      return fn(*args, **kwargs)
91
    finally:
92
      queue_lock.Unlock()
93

    
94
  return wrapper
95

    
96

    
97
def _DecodeImportExportIO(ieio, ieioargs):
98
  """Decodes import/export I/O information.
99

100
  """
101
  if ieio == constants.IEIO_RAW_DISK:
102
    assert len(ieioargs) == 1
103
    return (objects.Disk.FromDict(ieioargs[0]), )
104

    
105
  if ieio == constants.IEIO_SCRIPT:
106
    assert len(ieioargs) == 2
107
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
108

    
109
  return ieioargs
110

    
111

    
112
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113
  """Custom Request Executor class that ensures NodeHttpServer children are
114
  locked in ram.
115

116
  """
117
  def __init__(self, *args, **kwargs):
118
    utils.Mlockall()
119

    
120
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
121

    
122

    
123
class NodeHttpServer(http.server.HttpServer):
124
  """The server implementation.
125

126
  This class holds all methods exposed over the RPC interface.
127

128
  """
129
  # too many public methods, and unused args - all methods get params
130
  # due to the API
131
  # pylint: disable-msg=R0904,W0613
132
  def __init__(self, *args, **kwargs):
133
    http.server.HttpServer.__init__(self, *args, **kwargs)
134
    self.noded_pid = os.getpid()
135

    
136
  def HandleRequest(self, req):
137
    """Handle a request.
138

139
    """
140
    if req.request_method.upper() != http.HTTP_PUT:
141
      raise http.HttpBadRequest()
142

    
143
    path = req.request_path
144
    if path.startswith("/"):
145
      path = path[1:]
146

    
147
    method = getattr(self, "perspective_%s" % path, None)
148
    if method is None:
149
      raise http.HttpNotFound()
150

    
151
    try:
152
      result = (True, method(serializer.LoadJson(req.request_body)))
153

    
154
    except backend.RPCFail, err:
155
      # our custom failure exception; str(err) works fine if the
156
      # exception was constructed with a single argument, and in
157
      # this case, err.message == err.args[0] == str(err)
158
      result = (False, str(err))
159
    except errors.QuitGanetiException, err:
160
      # Tell parent to quit
161
      logging.info("Shutting down the node daemon, arguments: %s",
162
                   str(err.args))
163
      os.kill(self.noded_pid, signal.SIGTERM)
164
      # And return the error's arguments, which must be already in
165
      # correct tuple format
166
      result = err.args
167
    except Exception, err:
168
      logging.exception("Error in RPC call")
169
      result = (False, "Error while executing backend function: %s" % str(err))
170

    
171
    return serializer.DumpJson(result, indent=False)
172

    
173
  # the new block devices  --------------------------
174

    
175
  @staticmethod
176
  def perspective_blockdev_create(params):
177
    """Create a block device.
178

179
    """
180
    bdev_s, size, owner, on_primary, info = params
181
    bdev = objects.Disk.FromDict(bdev_s)
182
    if bdev is None:
183
      raise ValueError("can't unserialize data!")
184
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
185

    
186
  @staticmethod
187
  def perspective_blockdev_pause_resume_sync(params):
188
    """Pause/resume sync of a block device.
189

190
    """
191
    disks_s, pause = params
192
    disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
193
    return backend.BlockdevPauseResumeSync(disks, pause)
194

    
195
  @staticmethod
196
  def perspective_blockdev_wipe(params):
197
    """Wipe a block device.
198

199
    """
200
    bdev_s, offset, size = params
201
    bdev = objects.Disk.FromDict(bdev_s)
202
    return backend.BlockdevWipe(bdev, offset, size)
203

    
204
  @staticmethod
205
  def perspective_blockdev_remove(params):
206
    """Remove a block device.
207

208
    """
209
    bdev_s = params[0]
210
    bdev = objects.Disk.FromDict(bdev_s)
211
    return backend.BlockdevRemove(bdev)
212

    
213
  @staticmethod
214
  def perspective_blockdev_rename(params):
215
    """Remove a block device.
216

217
    """
218
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
219
    return backend.BlockdevRename(devlist)
220

    
221
  @staticmethod
222
  def perspective_blockdev_assemble(params):
223
    """Assemble a block device.
224

225
    """
226
    bdev_s, owner, on_primary, idx = params
227
    bdev = objects.Disk.FromDict(bdev_s)
228
    if bdev is None:
229
      raise ValueError("can't unserialize data!")
230
    return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
231

    
232
  @staticmethod
233
  def perspective_blockdev_shutdown(params):
234
    """Shutdown a block device.
235

236
    """
237
    bdev_s = params[0]
238
    bdev = objects.Disk.FromDict(bdev_s)
239
    if bdev is None:
240
      raise ValueError("can't unserialize data!")
241
    return backend.BlockdevShutdown(bdev)
242

    
243
  @staticmethod
244
  def perspective_blockdev_addchildren(params):
245
    """Add a child to a mirror device.
246

247
    Note: this is only valid for mirror devices. It's the caller's duty
248
    to send a correct disk, otherwise we raise an error.
249

250
    """
251
    bdev_s, ndev_s = params
252
    bdev = objects.Disk.FromDict(bdev_s)
253
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
254
    if bdev is None or ndevs.count(None) > 0:
255
      raise ValueError("can't unserialize data!")
256
    return backend.BlockdevAddchildren(bdev, ndevs)
257

    
258
  @staticmethod
259
  def perspective_blockdev_removechildren(params):
260
    """Remove a child from a mirror device.
261

262
    This is only valid for mirror devices, of course. It's the callers
263
    duty to send a correct disk, otherwise we raise an error.
264

265
    """
266
    bdev_s, ndev_s = params
267
    bdev = objects.Disk.FromDict(bdev_s)
268
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
269
    if bdev is None or ndevs.count(None) > 0:
270
      raise ValueError("can't unserialize data!")
271
    return backend.BlockdevRemovechildren(bdev, ndevs)
272

    
273
  @staticmethod
274
  def perspective_blockdev_getmirrorstatus(params):
275
    """Return the mirror status for a list of disks.
276

277
    """
278
    disks = [objects.Disk.FromDict(dsk_s)
279
             for dsk_s in params]
280
    return [status.ToDict()
281
            for status in backend.BlockdevGetmirrorstatus(disks)]
282

    
283
  @staticmethod
284
  def perspective_blockdev_getmirrorstatus_multi(params):
285
    """Return the mirror status for a list of disks.
286

287
    """
288
    (node_disks, ) = params
289

    
290
    node_name = netutils.Hostname.GetSysName()
291

    
292
    disks = [objects.Disk.FromDict(dsk_s)
293
             for dsk_s in node_disks.get(node_name, [])]
294

    
295
    result = []
296

    
297
    for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
298
      if success:
299
        result.append((success, status.ToDict()))
300
      else:
301
        result.append((success, status))
302

    
303
    return result
304

    
305
  @staticmethod
306
  def perspective_blockdev_find(params):
307
    """Expose the FindBlockDevice functionality for a disk.
308

309
    This will try to find but not activate a disk.
310

311
    """
312
    disk = objects.Disk.FromDict(params[0])
313

    
314
    result = backend.BlockdevFind(disk)
315
    if result is None:
316
      return None
317

    
318
    return result.ToDict()
319

    
320
  @staticmethod
321
  def perspective_blockdev_snapshot(params):
322
    """Create a snapshot device.
323

324
    Note that this is only valid for LVM disks, if we get passed
325
    something else we raise an exception. The snapshot device can be
326
    remove by calling the generic block device remove call.
327

328
    """
329
    cfbd = objects.Disk.FromDict(params[0])
330
    return backend.BlockdevSnapshot(cfbd)
331

    
332
  @staticmethod
333
  def perspective_blockdev_grow(params):
334
    """Grow a stack of devices.
335

336
    """
337
    cfbd = objects.Disk.FromDict(params[0])
338
    amount = params[1]
339
    return backend.BlockdevGrow(cfbd, amount)
340

    
341
  @staticmethod
342
  def perspective_blockdev_close(params):
343
    """Closes the given block devices.
344

345
    """
346
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
347
    return backend.BlockdevClose(params[0], disks)
348

    
349
  @staticmethod
350
  def perspective_blockdev_getsize(params):
351
    """Compute the sizes of the given block devices.
352

353
    """
354
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
355
    return backend.BlockdevGetsize(disks)
356

    
357
  @staticmethod
358
  def perspective_blockdev_export(params):
359
    """Compute the sizes of the given block devices.
360

361
    """
362
    disk = objects.Disk.FromDict(params[0])
363
    dest_node, dest_path, cluster_name = params[1:]
364
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
365

    
366
  # blockdev/drbd specific methods ----------
367

    
368
  @staticmethod
369
  def perspective_drbd_disconnect_net(params):
370
    """Disconnects the network connection of drbd disks.
371

372
    Note that this is only valid for drbd disks, so the members of the
373
    disk list must all be drbd devices.
374

375
    """
376
    nodes_ip, disks = params
377
    disks = [objects.Disk.FromDict(cf) for cf in disks]
378
    return backend.DrbdDisconnectNet(nodes_ip, disks)
379

    
380
  @staticmethod
381
  def perspective_drbd_attach_net(params):
382
    """Attaches the network connection of drbd disks.
383

384
    Note that this is only valid for drbd disks, so the members of the
385
    disk list must all be drbd devices.
386

387
    """
388
    nodes_ip, disks, instance_name, multimaster = params
389
    disks = [objects.Disk.FromDict(cf) for cf in disks]
390
    return backend.DrbdAttachNet(nodes_ip, disks,
391
                                     instance_name, multimaster)
392

    
393
  @staticmethod
394
  def perspective_drbd_wait_sync(params):
395
    """Wait until DRBD disks are synched.
396

397
    Note that this is only valid for drbd disks, so the members of the
398
    disk list must all be drbd devices.
399

400
    """
401
    nodes_ip, disks = params
402
    disks = [objects.Disk.FromDict(cf) for cf in disks]
403
    return backend.DrbdWaitSync(nodes_ip, disks)
404

    
405
  @staticmethod
406
  def perspective_drbd_helper(params):
407
    """Query drbd helper.
408

409
    """
410
    return backend.GetDrbdUsermodeHelper()
411

    
412
  # export/import  --------------------------
413

    
414
  @staticmethod
415
  def perspective_finalize_export(params):
416
    """Expose the finalize export functionality.
417

418
    """
419
    instance = objects.Instance.FromDict(params[0])
420

    
421
    snap_disks = []
422
    for disk in params[1]:
423
      if isinstance(disk, bool):
424
        snap_disks.append(disk)
425
      else:
426
        snap_disks.append(objects.Disk.FromDict(disk))
427

    
428
    return backend.FinalizeExport(instance, snap_disks)
429

    
430
  @staticmethod
431
  def perspective_export_info(params):
432
    """Query information about an existing export on this node.
433

434
    The given path may not contain an export, in which case we return
435
    None.
436

437
    """
438
    path = params[0]
439
    return backend.ExportInfo(path)
440

    
441
  @staticmethod
442
  def perspective_export_list(params):
443
    """List the available exports on this node.
444

445
    Note that as opposed to export_info, which may query data about an
446
    export in any path, this only queries the standard Ganeti path
447
    (constants.EXPORT_DIR).
448

449
    """
450
    return backend.ListExports()
451

    
452
  @staticmethod
453
  def perspective_export_remove(params):
454
    """Remove an export.
455

456
    """
457
    export = params[0]
458
    return backend.RemoveExport(export)
459

    
460
  # block device ---------------------
461
  @staticmethod
462
  def perspective_bdev_sizes(params):
463
    """Query the list of block devices
464

465
    """
466
    devices = params[0]
467
    return backend.GetBlockDevSizes(devices)
468

    
469
  # volume  --------------------------
470

    
471
  @staticmethod
472
  def perspective_lv_list(params):
473
    """Query the list of logical volumes in a given volume group.
474

475
    """
476
    vgname = params[0]
477
    return backend.GetVolumeList(vgname)
478

    
479
  @staticmethod
480
  def perspective_vg_list(params):
481
    """Query the list of volume groups.
482

483
    """
484
    return backend.ListVolumeGroups()
485

    
486
  # Storage --------------------------
487

    
488
  @staticmethod
489
  def perspective_storage_list(params):
490
    """Get list of storage units.
491

492
    """
493
    (su_name, su_args, name, fields) = params
494
    return storage.GetStorage(su_name, *su_args).List(name, fields)
495

    
496
  @staticmethod
497
  def perspective_storage_modify(params):
498
    """Modify a storage unit.
499

500
    """
501
    (su_name, su_args, name, changes) = params
502
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
503

    
504
  @staticmethod
505
  def perspective_storage_execute(params):
506
    """Execute an operation on a storage unit.
507

508
    """
509
    (su_name, su_args, name, op) = params
510
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
511

    
512
  # bridge  --------------------------
513

    
514
  @staticmethod
515
  def perspective_bridges_exist(params):
516
    """Check if all bridges given exist on this node.
517

518
    """
519
    bridges_list = params[0]
520
    return backend.BridgesExist(bridges_list)
521

    
522
  # instance  --------------------------
523

    
524
  @staticmethod
525
  def perspective_instance_os_add(params):
526
    """Install an OS on a given instance.
527

528
    """
529
    inst_s = params[0]
530
    inst = objects.Instance.FromDict(inst_s)
531
    reinstall = params[1]
532
    debug = params[2]
533
    return backend.InstanceOsAdd(inst, reinstall, debug)
534

    
535
  @staticmethod
536
  def perspective_instance_run_rename(params):
537
    """Runs the OS rename script for an instance.
538

539
    """
540
    inst_s, old_name, debug = params
541
    inst = objects.Instance.FromDict(inst_s)
542
    return backend.RunRenameInstance(inst, old_name, debug)
543

    
544
  @staticmethod
545
  def perspective_instance_shutdown(params):
546
    """Shutdown an instance.
547

548
    """
549
    instance = objects.Instance.FromDict(params[0])
550
    timeout = params[1]
551
    return backend.InstanceShutdown(instance, timeout)
552

    
553
  @staticmethod
554
  def perspective_instance_start(params):
555
    """Start an instance.
556

557
    """
558
    instance = objects.Instance.FromDict(params[0])
559
    return backend.StartInstance(instance)
560

    
561
  @staticmethod
562
  def perspective_migration_info(params):
563
    """Gather information about an instance to be migrated.
564

565
    """
566
    instance = objects.Instance.FromDict(params[0])
567
    return backend.MigrationInfo(instance)
568

    
569
  @staticmethod
570
  def perspective_accept_instance(params):
571
    """Prepare the node to accept an instance.
572

573
    """
574
    instance, info, target = params
575
    instance = objects.Instance.FromDict(instance)
576
    return backend.AcceptInstance(instance, info, target)
577

    
578
  @staticmethod
579
  def perspective_finalize_migration(params):
580
    """Finalize the instance migration.
581

582
    """
583
    instance, info, success = params
584
    instance = objects.Instance.FromDict(instance)
585
    return backend.FinalizeMigration(instance, info, success)
586

    
587
  @staticmethod
588
  def perspective_instance_migrate(params):
589
    """Migrates an instance.
590

591
    """
592
    instance, target, live = params
593
    instance = objects.Instance.FromDict(instance)
594
    return backend.MigrateInstance(instance, target, live)
595

    
596
  @staticmethod
597
  def perspective_instance_reboot(params):
598
    """Reboot an instance.
599

600
    """
601
    instance = objects.Instance.FromDict(params[0])
602
    reboot_type = params[1]
603
    shutdown_timeout = params[2]
604
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
605

    
606
  @staticmethod
607
  def perspective_instance_info(params):
608
    """Query instance information.
609

610
    """
611
    return backend.GetInstanceInfo(params[0], params[1])
612

    
613
  @staticmethod
614
  def perspective_instance_migratable(params):
615
    """Query whether the specified instance can be migrated.
616

617
    """
618
    instance = objects.Instance.FromDict(params[0])
619
    return backend.GetInstanceMigratable(instance)
620

    
621
  @staticmethod
622
  def perspective_all_instances_info(params):
623
    """Query information about all instances.
624

625
    """
626
    return backend.GetAllInstancesInfo(params[0])
627

    
628
  @staticmethod
629
  def perspective_instance_list(params):
630
    """Query the list of running instances.
631

632
    """
633
    return backend.GetInstanceList(params[0])
634

    
635
  # node --------------------------
636

    
637
  @staticmethod
638
  def perspective_node_tcp_ping(params):
639
    """Do a TcpPing on the remote node.
640

641
    """
642
    return netutils.TcpPing(params[1], params[2], timeout=params[3],
643
                            live_port_needed=params[4], source=params[0])
644

    
645
  @staticmethod
646
  def perspective_node_has_ip_address(params):
647
    """Checks if a node has the given ip address.
648

649
    """
650
    return netutils.IPAddress.Own(params[0])
651

    
652
  @staticmethod
653
  def perspective_node_info(params):
654
    """Query node information.
655

656
    """
657
    vgname, hypervisor_type = params
658
    return backend.GetNodeInfo(vgname, hypervisor_type)
659

    
660
  @staticmethod
661
  def perspective_etc_hosts_modify(params):
662
    """Modify a node entry in /etc/hosts.
663

664
    """
665
    backend.EtcHostsModify(params[0], params[1], params[2])
666

    
667
    return True
668

    
669
  @staticmethod
670
  def perspective_node_verify(params):
671
    """Run a verify sequence on this node.
672

673
    """
674
    return backend.VerifyNode(params[0], params[1])
675

    
676
  @staticmethod
677
  def perspective_node_start_master(params):
678
    """Promote this node to master status.
679

680
    """
681
    return backend.StartMaster(params[0], params[1])
682

    
683
  @staticmethod
684
  def perspective_node_stop_master(params):
685
    """Demote this node from master status.
686

687
    """
688
    return backend.StopMaster(params[0])
689

    
690
  @staticmethod
691
  def perspective_node_leave_cluster(params):
692
    """Cleanup after leaving a cluster.
693

694
    """
695
    return backend.LeaveCluster(params[0])
696

    
697
  @staticmethod
698
  def perspective_node_volumes(params):
699
    """Query the list of all logical volume groups.
700

701
    """
702
    return backend.NodeVolumes()
703

    
704
  @staticmethod
705
  def perspective_node_demote_from_mc(params):
706
    """Demote a node from the master candidate role.
707

708
    """
709
    return backend.DemoteFromMC()
710

    
711

    
712
  @staticmethod
713
  def perspective_node_powercycle(params):
714
    """Tries to powercycle the nod.
715

716
    """
717
    hypervisor_type = params[0]
718
    return backend.PowercycleNode(hypervisor_type)
719

    
720

    
721
  # cluster --------------------------
722

    
723
  @staticmethod
724
  def perspective_version(params):
725
    """Query version information.
726

727
    """
728
    return constants.PROTOCOL_VERSION
729

    
730
  @staticmethod
731
  def perspective_upload_file(params):
732
    """Upload a file.
733

734
    Note that the backend implementation imposes strict rules on which
735
    files are accepted.
736

737
    """
738
    return backend.UploadFile(*params)
739

    
740
  @staticmethod
741
  def perspective_master_info(params):
742
    """Query master information.
743

744
    """
745
    return backend.GetMasterInfo()
746

    
747
  @staticmethod
748
  def perspective_run_oob(params):
749
    """Runs oob on node.
750

751
    """
752
    output = backend.RunOob(params[0], params[1], params[2], params[3])
753
    if output:
754
      result = serializer.LoadJson(output)
755
    else:
756
      result = None
757
    return result
758

    
759
  @staticmethod
760
  def perspective_write_ssconf_files(params):
761
    """Write ssconf files.
762

763
    """
764
    (values,) = params
765
    return backend.WriteSsconfFiles(values)
766

    
767
  # os -----------------------
768

    
769
  @staticmethod
770
  def perspective_os_diagnose(params):
771
    """Query detailed information about existing OSes.
772

773
    """
774
    return backend.DiagnoseOS()
775

    
776
  @staticmethod
777
  def perspective_os_get(params):
778
    """Query information about a given OS.
779

780
    """
781
    name = params[0]
782
    os_obj = backend.OSFromDisk(name)
783
    return os_obj.ToDict()
784

    
785
  @staticmethod
786
  def perspective_os_validate(params):
787
    """Run a given OS' validation routine.
788

789
    """
790
    required, name, checks, params = params
791
    return backend.ValidateOS(required, name, checks, params)
792

    
793
  # hooks -----------------------
794

    
795
  @staticmethod
796
  def perspective_hooks_runner(params):
797
    """Run hook scripts.
798

799
    """
800
    hpath, phase, env = params
801
    hr = backend.HooksRunner()
802
    return hr.RunHooks(hpath, phase, env)
803

    
804
  # iallocator -----------------
805

    
806
  @staticmethod
807
  def perspective_iallocator_runner(params):
808
    """Run an iallocator script.
809

810
    """
811
    name, idata = params
812
    iar = backend.IAllocatorRunner()
813
    return iar.Run(name, idata)
814

    
815
  # test -----------------------
816

    
817
  @staticmethod
818
  def perspective_test_delay(params):
819
    """Run test delay.
820

821
    """
822
    duration = params[0]
823
    status, rval = utils.TestDelay(duration)
824
    if not status:
825
      raise backend.RPCFail(rval)
826
    return rval
827

    
828
  # file storage ---------------
829

    
830
  @staticmethod
831
  def perspective_file_storage_dir_create(params):
832
    """Create the file storage directory.
833

834
    """
835
    file_storage_dir = params[0]
836
    return backend.CreateFileStorageDir(file_storage_dir)
837

    
838
  @staticmethod
839
  def perspective_file_storage_dir_remove(params):
840
    """Remove the file storage directory.
841

842
    """
843
    file_storage_dir = params[0]
844
    return backend.RemoveFileStorageDir(file_storage_dir)
845

    
846
  @staticmethod
847
  def perspective_file_storage_dir_rename(params):
848
    """Rename the file storage directory.
849

850
    """
851
    old_file_storage_dir = params[0]
852
    new_file_storage_dir = params[1]
853
    return backend.RenameFileStorageDir(old_file_storage_dir,
854
                                        new_file_storage_dir)
855

    
856
  # jobs ------------------------
857

    
858
  @staticmethod
859
  @_RequireJobQueueLock
860
  def perspective_jobqueue_update(params):
861
    """Update job queue.
862

863
    """
864
    (file_name, content) = params
865
    return backend.JobQueueUpdate(file_name, content)
866

    
867
  @staticmethod
868
  @_RequireJobQueueLock
869
  def perspective_jobqueue_purge(params):
870
    """Purge job queue.
871

872
    """
873
    return backend.JobQueuePurge()
874

    
875
  @staticmethod
876
  @_RequireJobQueueLock
877
  def perspective_jobqueue_rename(params):
878
    """Rename a job queue file.
879

880
    """
881
    # TODO: What if a file fails to rename?
882
    return [backend.JobQueueRename(old, new) for old, new in params]
883

    
884
  # hypervisor ---------------
885

    
886
  @staticmethod
887
  def perspective_hypervisor_validate_params(params):
888
    """Validate the hypervisor parameters.
889

890
    """
891
    (hvname, hvparams) = params
892
    return backend.ValidateHVParams(hvname, hvparams)
893

    
894
  # Crypto
895

    
896
  @staticmethod
897
  def perspective_x509_cert_create(params):
898
    """Creates a new X509 certificate for SSL/TLS.
899

900
    """
901
    (validity, ) = params
902
    return backend.CreateX509Certificate(validity)
903

    
904
  @staticmethod
905
  def perspective_x509_cert_remove(params):
906
    """Removes a X509 certificate.
907

908
    """
909
    (name, ) = params
910
    return backend.RemoveX509Certificate(name)
911

    
912
  # Import and export
913

    
914
  @staticmethod
915
  def perspective_import_start(params):
916
    """Starts an import daemon.
917

918
    """
919
    (opts_s, instance, dest, dest_args) = params
920

    
921
    opts = objects.ImportExportOptions.FromDict(opts_s)
922

    
923
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
924
                                           None, None,
925
                                           objects.Instance.FromDict(instance),
926
                                           dest,
927
                                           _DecodeImportExportIO(dest,
928
                                                                 dest_args))
929

    
930
  @staticmethod
931
  def perspective_export_start(params):
932
    """Starts an export daemon.
933

934
    """
935
    (opts_s, host, port, instance, source, source_args) = params
936

    
937
    opts = objects.ImportExportOptions.FromDict(opts_s)
938

    
939
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
940
                                           host, port,
941
                                           objects.Instance.FromDict(instance),
942
                                           source,
943
                                           _DecodeImportExportIO(source,
944
                                                                 source_args))
945

    
946
  @staticmethod
947
  def perspective_impexp_status(params):
948
    """Retrieves the status of an import or export daemon.
949

950
    """
951
    return backend.GetImportExportStatus(params[0])
952

    
953
  @staticmethod
954
  def perspective_impexp_abort(params):
955
    """Aborts an import or export.
956

957
    """
958
    return backend.AbortImportExport(params[0])
959

    
960
  @staticmethod
961
  def perspective_impexp_cleanup(params):
962
    """Cleans up after an import or export.
963

964
    """
965
    return backend.CleanupImportExport(params[0])
966

    
967

    
968
def CheckNoded(_, args):
969
  """Initial checks whether to run or exit with a failure.
970

971
  """
972
  if args: # noded doesn't take any arguments
973
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
974
                          sys.argv[0])
975
    sys.exit(constants.EXIT_FAILURE)
976

    
977

    
978
def PrepNoded(options, _):
979
  """Preparation node daemon function, executed with the PID file held.
980

981
  """
982
  if options.mlock:
983
    request_executor_class = MlockallRequestExecutor
984
    try:
985
      utils.Mlockall()
986
    except errors.NoCtypesError:
987
      logging.warning("Cannot set memory lock, ctypes module not found")
988
      request_executor_class = http.server.HttpServerRequestExecutor
989
  else:
990
    request_executor_class = http.server.HttpServerRequestExecutor
991

    
992
  # Read SSL certificate
993
  if options.ssl:
994
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
995
                                    ssl_cert_path=options.ssl_cert)
996
  else:
997
    ssl_params = None
998

    
999
  err = _PrepareQueueLock()
1000
  if err is not None:
1001
    # this might be some kind of file-system/permission error; while
1002
    # this breaks the job queue functionality, we shouldn't prevent
1003
    # startup of the whole node daemon because of this
1004
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1005

    
1006
  mainloop = daemon.Mainloop()
1007
  server = NodeHttpServer(mainloop, options.bind_address, options.port,
1008
                          ssl_params=ssl_params, ssl_verify_peer=True,
1009
                          request_executor_class=request_executor_class)
1010
  server.Start()
1011
  return (mainloop, server)
1012

    
1013

    
1014
def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
1015
  """Main node daemon function, executed with the PID file held.
1016

1017
  """
1018
  (mainloop, server) = prep_data
1019
  try:
1020
    mainloop.Run()
1021
  finally:
1022
    server.Stop()
1023

    
1024

    
1025
def Main():
1026
  """Main function for the node daemon.
1027

1028
  """
1029
  parser = OptionParser(description="Ganeti node daemon",
1030
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1031
                        version="%%prog (ganeti) %s" %
1032
                        constants.RELEASE_VERSION)
1033
  parser.add_option("--no-mlock", dest="mlock",
1034
                    help="Do not mlock the node memory in ram",
1035
                    default=True, action="store_false")
1036

    
1037
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1038
                     default_ssl_cert=constants.NODED_CERT_FILE,
1039
                     default_ssl_key=constants.NODED_CERT_FILE,
1040
                     console_logging=True)