Statistics
| Branch: | Tag: | Revision:

root / lib / server / noded.py @ ee501db1

History | View | Annotate | Download (29.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# pylint: disable=C0103,W0142
25

    
26
# C0103: Functions in this module need to have a given name structure,
27
# and the name of the daemon doesn't match
28

    
29
# W0142: Used * or ** magic, since we do use it extensively in this
30
# module
31

    
32
import os
33
import sys
34
import logging
35
import signal
36
import codecs
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import backend
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import errors
44
from ganeti import jstore
45
from ganeti import daemon
46
from ganeti import http
47
from ganeti import utils
48
from ganeti import storage
49
from ganeti import serializer
50
from ganeti import netutils
51
from ganeti import pathutils
52
from ganeti import ssconf
53

    
54
import ganeti.http.server # pylint: disable=W0611
55

    
56

    
57
queue_lock = None
58

    
59

    
60
def _PrepareQueueLock():
61
  """Try to prepare the queue lock.
62

63
  @return: None for success, otherwise an exception object
64

65
  """
66
  global queue_lock # pylint: disable=W0603
67

    
68
  if queue_lock is not None:
69
    return None
70

    
71
  # Prepare job queue
72
  try:
73
    queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
74
    return None
75
  except EnvironmentError, err:
76
    return err
77

    
78

    
79
def _RequireJobQueueLock(fn):
80
  """Decorator for job queue manipulating functions.
81

82
  """
83
  QUEUE_LOCK_TIMEOUT = 10
84

    
85
  def wrapper(*args, **kwargs):
86
    # Locking in exclusive, blocking mode because there could be several
87
    # children running at the same time. Waiting up to 10 seconds.
88
    if _PrepareQueueLock() is not None:
89
      raise errors.JobQueueError("Job queue failed initialization,"
90
                                 " cannot update jobs")
91
    queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
92
    try:
93
      return fn(*args, **kwargs)
94
    finally:
95
      queue_lock.Unlock()
96

    
97
  return wrapper
98

    
99

    
100
def _DecodeImportExportIO(ieio, ieioargs):
101
  """Decodes import/export I/O information.
102

103
  """
104
  if ieio == constants.IEIO_RAW_DISK:
105
    assert len(ieioargs) == 1
106
    return (objects.Disk.FromDict(ieioargs[0]), )
107

    
108
  if ieio == constants.IEIO_SCRIPT:
109
    assert len(ieioargs) == 2
110
    return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
111

    
112
  return ieioargs
113

    
114

    
115
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
116
  """Subclass ensuring request handlers are locked in RAM.
117

118
  """
119
  def __init__(self, *args, **kwargs):
120
    utils.Mlockall()
121

    
122
    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
123

    
124

    
125
class NodeRequestHandler(http.server.HttpServerHandler):
126
  """The server implementation.
127

128
  This class holds all methods exposed over the RPC interface.
129

130
  """
131
  # too many public methods, and unused args - all methods get params
132
  # due to the API
133
  # pylint: disable=R0904,W0613
134
  def __init__(self):
135
    http.server.HttpServerHandler.__init__(self)
136
    self.noded_pid = os.getpid()
137

    
138
  def HandleRequest(self, req):
139
    """Handle a request.
140

141
    """
142
    # FIXME: Remove HTTP_PUT in Ganeti 2.7
143
    if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
144
      raise http.HttpBadRequest("Only PUT and POST methods are supported")
145

    
146
    path = req.request_path
147
    if path.startswith("/"):
148
      path = path[1:]
149

    
150
    method = getattr(self, "perspective_%s" % path, None)
151
    if method is None:
152
      raise http.HttpNotFound()
153

    
154
    try:
155
      result = (True, method(serializer.LoadJson(req.request_body)))
156

    
157
    except backend.RPCFail, err:
158
      # our custom failure exception; str(err) works fine if the
159
      # exception was constructed with a single argument, and in
160
      # this case, err.message == err.args[0] == str(err)
161
      result = (False, str(err))
162
    except errors.QuitGanetiException, err:
163
      # Tell parent to quit
164
      logging.info("Shutting down the node daemon, arguments: %s",
165
                   str(err.args))
166
      os.kill(self.noded_pid, signal.SIGTERM)
167
      # And return the error's arguments, which must be already in
168
      # correct tuple format
169
      result = err.args
170
    except Exception, err:
171
      logging.exception("Error in RPC call")
172
      result = (False, "Error while executing backend function: %s" % str(err))
173

    
174
    return serializer.DumpJson(result)
175

    
176
  # the new block devices  --------------------------
177

    
178
  @staticmethod
179
  def perspective_blockdev_create(params):
180
    """Create a block device.
181

182
    """
183
    bdev_s, size, owner, on_primary, info = params
184
    bdev = objects.Disk.FromDict(bdev_s)
185
    if bdev is None:
186
      raise ValueError("can't unserialize data!")
187
    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
188

    
189
  @staticmethod
190
  def perspective_blockdev_pause_resume_sync(params):
191
    """Pause/resume sync of a block device.
192

193
    """
194
    disks_s, pause = params
195
    disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
196
    return backend.BlockdevPauseResumeSync(disks, pause)
197

    
198
  @staticmethod
199
  def perspective_blockdev_wipe(params):
200
    """Wipe a block device.
201

202
    """
203
    bdev_s, offset, size = params
204
    bdev = objects.Disk.FromDict(bdev_s)
205
    return backend.BlockdevWipe(bdev, offset, size)
206

    
207
  @staticmethod
208
  def perspective_blockdev_remove(params):
209
    """Remove a block device.
210

211
    """
212
    bdev_s = params[0]
213
    bdev = objects.Disk.FromDict(bdev_s)
214
    return backend.BlockdevRemove(bdev)
215

    
216
  @staticmethod
217
  def perspective_blockdev_rename(params):
218
    """Remove a block device.
219

220
    """
221
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
222
    return backend.BlockdevRename(devlist)
223

    
224
  @staticmethod
225
  def perspective_blockdev_assemble(params):
226
    """Assemble a block device.
227

228
    """
229
    bdev_s, owner, on_primary, idx = params
230
    bdev = objects.Disk.FromDict(bdev_s)
231
    if bdev is None:
232
      raise ValueError("can't unserialize data!")
233
    return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
234

    
235
  @staticmethod
236
  def perspective_blockdev_shutdown(params):
237
    """Shutdown a block device.
238

239
    """
240
    bdev_s = params[0]
241
    bdev = objects.Disk.FromDict(bdev_s)
242
    if bdev is None:
243
      raise ValueError("can't unserialize data!")
244
    return backend.BlockdevShutdown(bdev)
245

    
246
  @staticmethod
247
  def perspective_blockdev_addchildren(params):
248
    """Add a child to a mirror device.
249

250
    Note: this is only valid for mirror devices. It's the caller's duty
251
    to send a correct disk, otherwise we raise an error.
252

253
    """
254
    bdev_s, ndev_s = params
255
    bdev = objects.Disk.FromDict(bdev_s)
256
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
257
    if bdev is None or ndevs.count(None) > 0:
258
      raise ValueError("can't unserialize data!")
259
    return backend.BlockdevAddchildren(bdev, ndevs)
260

    
261
  @staticmethod
262
  def perspective_blockdev_removechildren(params):
263
    """Remove a child from a mirror device.
264

265
    This is only valid for mirror devices, of course. It's the callers
266
    duty to send a correct disk, otherwise we raise an error.
267

268
    """
269
    bdev_s, ndev_s = params
270
    bdev = objects.Disk.FromDict(bdev_s)
271
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
272
    if bdev is None or ndevs.count(None) > 0:
273
      raise ValueError("can't unserialize data!")
274
    return backend.BlockdevRemovechildren(bdev, ndevs)
275

    
276
  @staticmethod
277
  def perspective_blockdev_getmirrorstatus(params):
278
    """Return the mirror status for a list of disks.
279

280
    """
281
    disks = [objects.Disk.FromDict(dsk_s)
282
             for dsk_s in params[0]]
283
    return [status.ToDict()
284
            for status in backend.BlockdevGetmirrorstatus(disks)]
285

    
286
  @staticmethod
287
  def perspective_blockdev_getmirrorstatus_multi(params):
288
    """Return the mirror status for a list of disks.
289

290
    """
291
    (node_disks, ) = params
292

    
293
    disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
294

    
295
    result = []
296

    
297
    for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
298
      if success:
299
        result.append((success, status.ToDict()))
300
      else:
301
        result.append((success, status))
302

    
303
    return result
304

    
305
  @staticmethod
306
  def perspective_blockdev_find(params):
307
    """Expose the FindBlockDevice functionality for a disk.
308

309
    This will try to find but not activate a disk.
310

311
    """
312
    disk = objects.Disk.FromDict(params[0])
313

    
314
    result = backend.BlockdevFind(disk)
315
    if result is None:
316
      return None
317

    
318
    return result.ToDict()
319

    
320
  @staticmethod
321
  def perspective_blockdev_snapshot(params):
322
    """Create a snapshot device.
323

324
    Note that this is only valid for LVM disks, if we get passed
325
    something else we raise an exception. The snapshot device can be
326
    remove by calling the generic block device remove call.
327

328
    """
329
    cfbd = objects.Disk.FromDict(params[0])
330
    return backend.BlockdevSnapshot(cfbd)
331

    
332
  @staticmethod
333
  def perspective_blockdev_grow(params):
334
    """Grow a stack of devices.
335

336
    """
337
    if len(params) < 4:
338
      raise ValueError("Received only 3 parameters in blockdev_grow,"
339
                       " old master?")
340
    cfbd = objects.Disk.FromDict(params[0])
341
    amount = params[1]
342
    dryrun = params[2]
343
    backingstore = params[3]
344
    return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
345

    
346
  @staticmethod
347
  def perspective_blockdev_close(params):
348
    """Closes the given block devices.
349

350
    """
351
    disks = [objects.Disk.FromDict(cf) for cf in params[1]]
352
    return backend.BlockdevClose(params[0], disks)
353

    
354
  @staticmethod
355
  def perspective_blockdev_getsize(params):
356
    """Compute the sizes of the given block devices.
357

358
    """
359
    disks = [objects.Disk.FromDict(cf) for cf in params[0]]
360
    return backend.BlockdevGetsize(disks)
361

    
362
  @staticmethod
363
  def perspective_blockdev_export(params):
364
    """Compute the sizes of the given block devices.
365

366
    """
367
    disk = objects.Disk.FromDict(params[0])
368
    dest_node, dest_path, cluster_name = params[1:]
369
    return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
370

    
371
  # blockdev/drbd specific methods ----------
372

    
373
  @staticmethod
374
  def perspective_drbd_disconnect_net(params):
375
    """Disconnects the network connection of drbd disks.
376

377
    Note that this is only valid for drbd disks, so the members of the
378
    disk list must all be drbd devices.
379

380
    """
381
    nodes_ip, disks = params
382
    disks = [objects.Disk.FromDict(cf) for cf in disks]
383
    return backend.DrbdDisconnectNet(nodes_ip, disks)
384

    
385
  @staticmethod
386
  def perspective_drbd_attach_net(params):
387
    """Attaches the network connection of drbd disks.
388

389
    Note that this is only valid for drbd disks, so the members of the
390
    disk list must all be drbd devices.
391

392
    """
393
    nodes_ip, disks, instance_name, multimaster = params
394
    disks = [objects.Disk.FromDict(cf) for cf in disks]
395
    return backend.DrbdAttachNet(nodes_ip, disks,
396
                                     instance_name, multimaster)
397

    
398
  @staticmethod
399
  def perspective_drbd_wait_sync(params):
400
    """Wait until DRBD disks are synched.
401

402
    Note that this is only valid for drbd disks, so the members of the
403
    disk list must all be drbd devices.
404

405
    """
406
    nodes_ip, disks = params
407
    disks = [objects.Disk.FromDict(cf) for cf in disks]
408
    return backend.DrbdWaitSync(nodes_ip, disks)
409

    
410
  @staticmethod
411
  def perspective_drbd_helper(params):
412
    """Query drbd helper.
413

414
    """
415
    return backend.GetDrbdUsermodeHelper()
416

    
417
  # export/import  --------------------------
418

    
419
  @staticmethod
420
  def perspective_finalize_export(params):
421
    """Expose the finalize export functionality.
422

423
    """
424
    instance = objects.Instance.FromDict(params[0])
425

    
426
    snap_disks = []
427
    for disk in params[1]:
428
      if isinstance(disk, bool):
429
        snap_disks.append(disk)
430
      else:
431
        snap_disks.append(objects.Disk.FromDict(disk))
432

    
433
    return backend.FinalizeExport(instance, snap_disks)
434

    
435
  @staticmethod
436
  def perspective_export_info(params):
437
    """Query information about an existing export on this node.
438

439
    The given path may not contain an export, in which case we return
440
    None.
441

442
    """
443
    path = params[0]
444
    return backend.ExportInfo(path)
445

    
446
  @staticmethod
447
  def perspective_export_list(params):
448
    """List the available exports on this node.
449

450
    Note that as opposed to export_info, which may query data about an
451
    export in any path, this only queries the standard Ganeti path
452
    (pathutils.EXPORT_DIR).
453

454
    """
455
    return backend.ListExports()
456

    
457
  @staticmethod
458
  def perspective_export_remove(params):
459
    """Remove an export.
460

461
    """
462
    export = params[0]
463
    return backend.RemoveExport(export)
464

    
465
  # block device ---------------------
466
  @staticmethod
467
  def perspective_bdev_sizes(params):
468
    """Query the list of block devices
469

470
    """
471
    devices = params[0]
472
    return backend.GetBlockDevSizes(devices)
473

    
474
  # volume  --------------------------
475

    
476
  @staticmethod
477
  def perspective_lv_list(params):
478
    """Query the list of logical volumes in a given volume group.
479

480
    """
481
    vgname = params[0]
482
    return backend.GetVolumeList(vgname)
483

    
484
  @staticmethod
485
  def perspective_vg_list(params):
486
    """Query the list of volume groups.
487

488
    """
489
    return backend.ListVolumeGroups()
490

    
491
  # Storage --------------------------
492

    
493
  @staticmethod
494
  def perspective_storage_list(params):
495
    """Get list of storage units.
496

497
    """
498
    (su_name, su_args, name, fields) = params
499
    return storage.GetStorage(su_name, *su_args).List(name, fields)
500

    
501
  @staticmethod
502
  def perspective_storage_modify(params):
503
    """Modify a storage unit.
504

505
    """
506
    (su_name, su_args, name, changes) = params
507
    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
508

    
509
  @staticmethod
510
  def perspective_storage_execute(params):
511
    """Execute an operation on a storage unit.
512

513
    """
514
    (su_name, su_args, name, op) = params
515
    return storage.GetStorage(su_name, *su_args).Execute(name, op)
516

    
517
  # bridge  --------------------------
518

    
519
  @staticmethod
520
  def perspective_bridges_exist(params):
521
    """Check if all bridges given exist on this node.
522

523
    """
524
    bridges_list = params[0]
525
    return backend.BridgesExist(bridges_list)
526

    
527
  # instance  --------------------------
528

    
529
  @staticmethod
530
  def perspective_instance_os_add(params):
531
    """Install an OS on a given instance.
532

533
    """
534
    inst_s = params[0]
535
    inst = objects.Instance.FromDict(inst_s)
536
    reinstall = params[1]
537
    debug = params[2]
538
    return backend.InstanceOsAdd(inst, reinstall, debug)
539

    
540
  @staticmethod
541
  def perspective_instance_run_rename(params):
542
    """Runs the OS rename script for an instance.
543

544
    """
545
    inst_s, old_name, debug = params
546
    inst = objects.Instance.FromDict(inst_s)
547
    return backend.RunRenameInstance(inst, old_name, debug)
548

    
549
  @staticmethod
550
  def perspective_instance_shutdown(params):
551
    """Shutdown an instance.
552

553
    """
554
    instance = objects.Instance.FromDict(params[0])
555
    timeout = params[1]
556
    return backend.InstanceShutdown(instance, timeout)
557

    
558
  @staticmethod
559
  def perspective_instance_start(params):
560
    """Start an instance.
561

562
    """
563
    (instance_name, startup_paused) = params
564
    instance = objects.Instance.FromDict(instance_name)
565
    return backend.StartInstance(instance, startup_paused)
566

    
567
  @staticmethod
568
  def perspective_migration_info(params):
569
    """Gather information about an instance to be migrated.
570

571
    """
572
    instance = objects.Instance.FromDict(params[0])
573
    return backend.MigrationInfo(instance)
574

    
575
  @staticmethod
576
  def perspective_accept_instance(params):
577
    """Prepare the node to accept an instance.
578

579
    """
580
    instance, info, target = params
581
    instance = objects.Instance.FromDict(instance)
582
    return backend.AcceptInstance(instance, info, target)
583

    
584
  @staticmethod
585
  def perspective_instance_finalize_migration_dst(params):
586
    """Finalize the instance migration on the destination node.
587

588
    """
589
    instance, info, success = params
590
    instance = objects.Instance.FromDict(instance)
591
    return backend.FinalizeMigrationDst(instance, info, success)
592

    
593
  @staticmethod
594
  def perspective_instance_migrate(params):
595
    """Migrates an instance.
596

597
    """
598
    instance, target, live = params
599
    instance = objects.Instance.FromDict(instance)
600
    return backend.MigrateInstance(instance, target, live)
601

    
602
  @staticmethod
603
  def perspective_instance_finalize_migration_src(params):
604
    """Finalize the instance migration on the source node.
605

606
    """
607
    instance, success, live = params
608
    instance = objects.Instance.FromDict(instance)
609
    return backend.FinalizeMigrationSource(instance, success, live)
610

    
611
  @staticmethod
612
  def perspective_instance_get_migration_status(params):
613
    """Reports migration status.
614

615
    """
616
    instance = objects.Instance.FromDict(params[0])
617
    return backend.GetMigrationStatus(instance).ToDict()
618

    
619
  @staticmethod
620
  def perspective_instance_reboot(params):
621
    """Reboot an instance.
622

623
    """
624
    instance = objects.Instance.FromDict(params[0])
625
    reboot_type = params[1]
626
    shutdown_timeout = params[2]
627
    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
628

    
629
  @staticmethod
630
  def perspective_instance_balloon_memory(params):
631
    """Modify instance runtime memory.
632

633
    """
634
    instance_dict, memory = params
635
    instance = objects.Instance.FromDict(instance_dict)
636
    return backend.InstanceBalloonMemory(instance, memory)
637

    
638
  @staticmethod
639
  def perspective_instance_info(params):
640
    """Query instance information.
641

642
    """
643
    return backend.GetInstanceInfo(params[0], params[1])
644

    
645
  @staticmethod
646
  def perspective_instance_migratable(params):
647
    """Query whether the specified instance can be migrated.
648

649
    """
650
    instance = objects.Instance.FromDict(params[0])
651
    return backend.GetInstanceMigratable(instance)
652

    
653
  @staticmethod
654
  def perspective_all_instances_info(params):
655
    """Query information about all instances.
656

657
    """
658
    return backend.GetAllInstancesInfo(params[0])
659

    
660
  @staticmethod
661
  def perspective_instance_list(params):
662
    """Query the list of running instances.
663

664
    """
665
    return backend.GetInstanceList(params[0])
666

    
667
  # node --------------------------
668

    
669
  @staticmethod
670
  def perspective_node_has_ip_address(params):
671
    """Checks if a node has the given ip address.
672

673
    """
674
    return netutils.IPAddress.Own(params[0])
675

    
676
  @staticmethod
677
  def perspective_node_info(params):
678
    """Query node information.
679

680
    """
681
    (vg_names, hv_names) = params
682
    return backend.GetNodeInfo(vg_names, hv_names)
683

    
684
  @staticmethod
685
  def perspective_etc_hosts_modify(params):
686
    """Modify a node entry in /etc/hosts.
687

688
    """
689
    backend.EtcHostsModify(params[0], params[1], params[2])
690

    
691
    return True
692

    
693
  @staticmethod
694
  def perspective_node_verify(params):
695
    """Run a verify sequence on this node.
696

697
    """
698
    return backend.VerifyNode(params[0], params[1])
699

    
700
  @staticmethod
701
  def perspective_node_start_master_daemons(params):
702
    """Start the master daemons on this node.
703

704
    """
705
    return backend.StartMasterDaemons(params[0])
706

    
707
  @staticmethod
708
  def perspective_node_activate_master_ip(params):
709
    """Activate the master IP on this node.
710

711
    """
712
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
713
    return backend.ActivateMasterIp(master_params, params[1])
714

    
715
  @staticmethod
716
  def perspective_node_deactivate_master_ip(params):
717
    """Deactivate the master IP on this node.
718

719
    """
720
    master_params = objects.MasterNetworkParameters.FromDict(params[0])
721
    return backend.DeactivateMasterIp(master_params, params[1])
722

    
723
  @staticmethod
724
  def perspective_node_stop_master(params):
725
    """Stops master daemons on this node.
726

727
    """
728
    return backend.StopMasterDaemons()
729

    
730
  @staticmethod
731
  def perspective_node_change_master_netmask(params):
732
    """Change the master IP netmask.
733

734
    """
735
    return backend.ChangeMasterNetmask(params[0], params[1], params[2],
736
                                       params[3])
737

    
738
  @staticmethod
739
  def perspective_node_leave_cluster(params):
740
    """Cleanup after leaving a cluster.
741

742
    """
743
    return backend.LeaveCluster(params[0])
744

    
745
  @staticmethod
746
  def perspective_node_volumes(params):
747
    """Query the list of all logical volume groups.
748

749
    """
750
    return backend.NodeVolumes()
751

    
752
  @staticmethod
753
  def perspective_node_demote_from_mc(params):
754
    """Demote a node from the master candidate role.
755

756
    """
757
    return backend.DemoteFromMC()
758

    
759
  @staticmethod
760
  def perspective_node_powercycle(params):
761
    """Tries to powercycle the nod.
762

763
    """
764
    hypervisor_type = params[0]
765
    return backend.PowercycleNode(hypervisor_type)
766

    
767
  # cluster --------------------------
768

    
769
  @staticmethod
770
  def perspective_version(params):
771
    """Query version information.
772

773
    """
774
    return constants.PROTOCOL_VERSION
775

    
776
  @staticmethod
777
  def perspective_upload_file(params):
778
    """Upload a file.
779

780
    Note that the backend implementation imposes strict rules on which
781
    files are accepted.
782

783
    """
784
    return backend.UploadFile(*(params[0]))
785

    
786
  @staticmethod
787
  def perspective_master_info(params):
788
    """Query master information.
789

790
    """
791
    return backend.GetMasterInfo()
792

    
793
  @staticmethod
794
  def perspective_run_oob(params):
795
    """Runs oob on node.
796

797
    """
798
    output = backend.RunOob(params[0], params[1], params[2], params[3])
799
    if output:
800
      result = serializer.LoadJson(output)
801
    else:
802
      result = None
803
    return result
804

    
805
  @staticmethod
806
  def perspective_write_ssconf_files(params):
807
    """Write ssconf files.
808

809
    """
810
    (values,) = params
811
    return ssconf.WriteSsconfFiles(values)
812

    
813
  # os -----------------------
814

    
815
  @staticmethod
816
  def perspective_os_diagnose(params):
817
    """Query detailed information about existing OSes.
818

819
    """
820
    return backend.DiagnoseOS()
821

    
822
  @staticmethod
823
  def perspective_os_get(params):
824
    """Query information about a given OS.
825

826
    """
827
    name = params[0]
828
    os_obj = backend.OSFromDisk(name)
829
    return os_obj.ToDict()
830

    
831
  @staticmethod
832
  def perspective_os_validate(params):
833
    """Run a given OS' validation routine.
834

835
    """
836
    required, name, checks, params = params
837
    return backend.ValidateOS(required, name, checks, params)
838

    
839
  # hooks -----------------------
840

    
841
  @staticmethod
842
  def perspective_hooks_runner(params):
843
    """Run hook scripts.
844

845
    """
846
    hpath, phase, env = params
847
    hr = backend.HooksRunner()
848
    return hr.RunHooks(hpath, phase, env)
849

    
850
  # iallocator -----------------
851

    
852
  @staticmethod
853
  def perspective_iallocator_runner(params):
854
    """Run an iallocator script.
855

856
    """
857
    name, idata = params
858
    iar = backend.IAllocatorRunner()
859
    return iar.Run(name, idata)
860

    
861
  # test -----------------------
862

    
863
  @staticmethod
864
  def perspective_test_delay(params):
865
    """Run test delay.
866

867
    """
868
    duration = params[0]
869
    status, rval = utils.TestDelay(duration)
870
    if not status:
871
      raise backend.RPCFail(rval)
872
    return rval
873

    
874
  # file storage ---------------
875

    
876
  @staticmethod
877
  def perspective_file_storage_dir_create(params):
878
    """Create the file storage directory.
879

880
    """
881
    file_storage_dir = params[0]
882
    return backend.CreateFileStorageDir(file_storage_dir)
883

    
884
  @staticmethod
885
  def perspective_file_storage_dir_remove(params):
886
    """Remove the file storage directory.
887

888
    """
889
    file_storage_dir = params[0]
890
    return backend.RemoveFileStorageDir(file_storage_dir)
891

    
892
  @staticmethod
893
  def perspective_file_storage_dir_rename(params):
894
    """Rename the file storage directory.
895

896
    """
897
    old_file_storage_dir = params[0]
898
    new_file_storage_dir = params[1]
899
    return backend.RenameFileStorageDir(old_file_storage_dir,
900
                                        new_file_storage_dir)
901

    
902
  # jobs ------------------------
903

    
904
  @staticmethod
905
  @_RequireJobQueueLock
906
  def perspective_jobqueue_update(params):
907
    """Update job queue.
908

909
    """
910
    (file_name, content) = params
911
    return backend.JobQueueUpdate(file_name, content)
912

    
913
  @staticmethod
914
  @_RequireJobQueueLock
915
  def perspective_jobqueue_purge(params):
916
    """Purge job queue.
917

918
    """
919
    return backend.JobQueuePurge()
920

    
921
  @staticmethod
922
  @_RequireJobQueueLock
923
  def perspective_jobqueue_rename(params):
924
    """Rename a job queue file.
925

926
    """
927
    # TODO: What if a file fails to rename?
928
    return [backend.JobQueueRename(old, new) for old, new in params[0]]
929

    
930
  # hypervisor ---------------
931

    
932
  @staticmethod
933
  def perspective_hypervisor_validate_params(params):
934
    """Validate the hypervisor parameters.
935

936
    """
937
    (hvname, hvparams) = params
938
    return backend.ValidateHVParams(hvname, hvparams)
939

    
940
  # Crypto
941

    
942
  @staticmethod
943
  def perspective_x509_cert_create(params):
944
    """Creates a new X509 certificate for SSL/TLS.
945

946
    """
947
    (validity, ) = params
948
    return backend.CreateX509Certificate(validity)
949

    
950
  @staticmethod
951
  def perspective_x509_cert_remove(params):
952
    """Removes a X509 certificate.
953

954
    """
955
    (name, ) = params
956
    return backend.RemoveX509Certificate(name)
957

    
958
  # Import and export
959

    
960
  @staticmethod
961
  def perspective_import_start(params):
962
    """Starts an import daemon.
963

964
    """
965
    (opts_s, instance, component, (dest, dest_args)) = params
966

    
967
    opts = objects.ImportExportOptions.FromDict(opts_s)
968

    
969
    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
970
                                           None, None,
971
                                           objects.Instance.FromDict(instance),
972
                                           component, dest,
973
                                           _DecodeImportExportIO(dest,
974
                                                                 dest_args))
975

    
976
  @staticmethod
977
  def perspective_export_start(params):
978
    """Starts an export daemon.
979

980
    """
981
    (opts_s, host, port, instance, component, (source, source_args)) = params
982

    
983
    opts = objects.ImportExportOptions.FromDict(opts_s)
984

    
985
    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
986
                                           host, port,
987
                                           objects.Instance.FromDict(instance),
988
                                           component, source,
989
                                           _DecodeImportExportIO(source,
990
                                                                 source_args))
991

    
992
  @staticmethod
993
  def perspective_impexp_status(params):
994
    """Retrieves the status of an import or export daemon.
995

996
    """
997
    return backend.GetImportExportStatus(params[0])
998

    
999
  @staticmethod
1000
  def perspective_impexp_abort(params):
1001
    """Aborts an import or export.
1002

1003
    """
1004
    return backend.AbortImportExport(params[0])
1005

    
1006
  @staticmethod
1007
  def perspective_impexp_cleanup(params):
1008
    """Cleans up after an import or export.
1009

1010
    """
1011
    return backend.CleanupImportExport(params[0])
1012

    
1013

    
1014
def CheckNoded(_, args):
1015
  """Initial checks whether to run or exit with a failure.
1016

1017
  """
1018
  if args: # noded doesn't take any arguments
1019
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
1020
                          sys.argv[0])
1021
    sys.exit(constants.EXIT_FAILURE)
1022
  try:
1023
    codecs.lookup("string-escape")
1024
  except LookupError:
1025
    print >> sys.stderr, ("Can't load the string-escape code which is part"
1026
                          " of the Python installation. Is your installation"
1027
                          " complete/correct? Aborting.")
1028
    sys.exit(constants.EXIT_FAILURE)
1029

    
1030

    
1031
def PrepNoded(options, _):
1032
  """Preparation node daemon function, executed with the PID file held.
1033

1034
  """
1035
  if options.mlock:
1036
    request_executor_class = MlockallRequestExecutor
1037
    try:
1038
      utils.Mlockall()
1039
    except errors.NoCtypesError:
1040
      logging.warning("Cannot set memory lock, ctypes module not found")
1041
      request_executor_class = http.server.HttpServerRequestExecutor
1042
  else:
1043
    request_executor_class = http.server.HttpServerRequestExecutor
1044

    
1045
  # Read SSL certificate
1046
  if options.ssl:
1047
    ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
1048
                                    ssl_cert_path=options.ssl_cert)
1049
  else:
1050
    ssl_params = None
1051

    
1052
  err = _PrepareQueueLock()
1053
  if err is not None:
1054
    # this might be some kind of file-system/permission error; while
1055
    # this breaks the job queue functionality, we shouldn't prevent
1056
    # startup of the whole node daemon because of this
1057
    logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
1058

    
1059
  handler = NodeRequestHandler()
1060

    
1061
  mainloop = daemon.Mainloop()
1062
  server = \
1063
    http.server.HttpServer(mainloop, options.bind_address, options.port,
1064
                           handler, ssl_params=ssl_params, ssl_verify_peer=True,
1065
                           request_executor_class=request_executor_class)
1066
  server.Start()
1067

    
1068
  return (mainloop, server)
1069

    
1070

    
1071
def ExecNoded(options, args, prep_data): # pylint: disable=W0613
1072
  """Main node daemon function, executed with the PID file held.
1073

1074
  """
1075
  (mainloop, server) = prep_data
1076
  try:
1077
    mainloop.Run()
1078
  finally:
1079
    server.Stop()
1080

    
1081

    
1082
def Main():
1083
  """Main function for the node daemon.
1084

1085
  """
1086
  parser = OptionParser(description="Ganeti node daemon",
1087
                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1088
                        version="%%prog (ganeti) %s" %
1089
                        constants.RELEASE_VERSION)
1090
  parser.add_option("--no-mlock", dest="mlock",
1091
                    help="Do not mlock the node memory in ram",
1092
                    default=True, action="store_false")
1093

    
1094
  daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1095
                     default_ssl_cert=pathutils.NODED_CERT_FILE,
1096
                     default_ssl_key=pathutils.NODED_CERT_FILE,
1097
                     console_logging=True)