Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ ff5fac04

History | View | Annotate | Download (17 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import traceback
30
import BaseHTTPServer
31
import simplejson
32
import errno
33
import logging
34

    
35
from optparse import OptionParser
36

    
37

    
38
from ganeti import backend
39
from ganeti import logger
40
from ganeti import constants
41
from ganeti import objects
42
from ganeti import errors
43
from ganeti import ssconf
44
from ganeti import utils
45

    
46
_EXIT_GANETI_NODED = False
47

    
48

    
49
class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
50
  """The server implementation.
51

    
52
  This class holds all methods exposed over the RPC interface.
53

    
54
  """
55
  def do_PUT(self):
56
    """Handle a post request.
57

    
58
    """
59
    path = self.path
60
    if path.startswith("/"):
61
      path = path[1:]
62
    logging.debug("ServerObject: received call '%s'", path)
63
    mname = "perspective_%s" % path
64
    if not hasattr(self, mname):
65
      self.send_error(404)
66
      return False
67

    
68
    method = getattr(self, mname)
69
    try:
70
      body_length = int(self.headers.get('Content-Length', '0'))
71
    except ValueError:
72
      self.send_error(400, 'No Content-Length header or invalid format')
73
      return False
74

    
75
    try:
76
      body = self.rfile.read(body_length)
77
    except socket.error, err:
78
      logger.Error("Socket error while reading: %s" % str(err))
79
      return
80
    try:
81
      params = simplejson.loads(body)
82
      logging.debug("ServerObject: method parameters: %s", params)
83
      result = method(params)
84
      payload = simplejson.dumps(result)
85
    except errors.QuitGanetiException, err:
86
      global _EXIT_GANETI_NODED
87
      _EXIT_GANETI_NODED = True
88
      if isinstance(err.args, tuple) and len(err.args) == 2:
89
        if err.args[0]:
90
          self.send_error(500, "Error: %s" % str(err[1]))
91
        else:
92
          payload = simplejson.dumps(err.args[1])
93
      else:
94
        self.log_message('QuitGanetiException Usage Error')
95
        self.send_error(500, "Error: %s" % str(err))
96
    except Exception, err:
97
      self.send_error(500, "Error: %s" % str(err))
98
      return False
99
    self.send_response(200)
100
    self.send_header('Content-Length', str(len(payload)))
101
    self.end_headers()
102
    self.wfile.write(payload)
103
    return True
104

    
105
  def log_message(self, format, *args):
106
    """Log a request to the log.
107

    
108
    This is the same as the parent, we just log somewhere else.
109

    
110
    """
111
    msg = ("%s - - [%s] %s" %
112
           (self.address_string(),
113
            self.log_date_time_string(),
114
            format % args))
115
    logging.debug(msg)
116

    
117
  # the new block devices  --------------------------
118

    
119
  @staticmethod
120
  def perspective_blockdev_create(params):
121
    """Create a block device.
122

    
123
    """
124
    bdev_s, size, owner, on_primary, info = params
125
    bdev = objects.Disk.FromDict(bdev_s)
126
    if bdev is None:
127
      raise ValueError("can't unserialize data!")
128
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
129

    
130
  @staticmethod
131
  def perspective_blockdev_remove(params):
132
    """Remove a block device.
133

    
134
    """
135
    bdev_s = params[0]
136
    bdev = objects.Disk.FromDict(bdev_s)
137
    return backend.RemoveBlockDevice(bdev)
138

    
139
  @staticmethod
140
  def perspective_blockdev_rename(params):
141
    """Remove a block device.
142

    
143
    """
144
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
145
    return backend.RenameBlockDevices(devlist)
146

    
147
  @staticmethod
148
  def perspective_blockdev_assemble(params):
149
    """Assemble a block device.
150

    
151
    """
152
    bdev_s, owner, on_primary = params
153
    bdev = objects.Disk.FromDict(bdev_s)
154
    if bdev is None:
155
      raise ValueError("can't unserialize data!")
156
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
157

    
158
  @staticmethod
159
  def perspective_blockdev_shutdown(params):
160
    """Shutdown a block device.
161

    
162
    """
163
    bdev_s = params[0]
164
    bdev = objects.Disk.FromDict(bdev_s)
165
    if bdev is None:
166
      raise ValueError("can't unserialize data!")
167
    return backend.ShutdownBlockDevice(bdev)
168

    
169
  @staticmethod
170
  def perspective_blockdev_addchildren(params):
171
    """Add a child to a mirror device.
172

    
173
    Note: this is only valid for mirror devices. It's the caller's duty
174
    to send a correct disk, otherwise we raise an error.
175

    
176
    """
177
    bdev_s, ndev_s = params
178
    bdev = objects.Disk.FromDict(bdev_s)
179
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180
    if bdev is None or ndevs.count(None) > 0:
181
      raise ValueError("can't unserialize data!")
182
    return backend.MirrorAddChildren(bdev, ndevs)
183

    
184
  @staticmethod
185
  def perspective_blockdev_removechildren(params):
186
    """Remove a child from a mirror device.
187

    
188
    This is only valid for mirror devices, of course. It's the callers
189
    duty to send a correct disk, otherwise we raise an error.
190

    
191
    """
192
    bdev_s, ndev_s = params
193
    bdev = objects.Disk.FromDict(bdev_s)
194
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
195
    if bdev is None or ndevs.count(None) > 0:
196
      raise ValueError("can't unserialize data!")
197
    return backend.MirrorRemoveChildren(bdev, ndevs)
198

    
199
  @staticmethod
200
  def perspective_blockdev_getmirrorstatus(params):
201
    """Return the mirror status for a list of disks.
202

    
203
    """
204
    disks = [objects.Disk.FromDict(dsk_s)
205
            for dsk_s in params]
206
    return backend.GetMirrorStatus(disks)
207

    
208
  @staticmethod
209
  def perspective_blockdev_find(params):
210
    """Expose the FindBlockDevice functionality for a disk.
211

    
212
    This will try to find but not activate a disk.
213

    
214
    """
215
    disk = objects.Disk.FromDict(params[0])
216
    return backend.FindBlockDevice(disk)
217

    
218
  @staticmethod
219
  def perspective_blockdev_snapshot(params):
220
    """Create a snapshot device.
221

    
222
    Note that this is only valid for LVM disks, if we get passed
223
    something else we raise an exception. The snapshot device can be
224
    remove by calling the generic block device remove call.
225

    
226
    """
227
    cfbd = objects.Disk.FromDict(params[0])
228
    return backend.SnapshotBlockDevice(cfbd)
229

    
230
  @staticmethod
231
  def perspective_blockdev_grow(params):
232
    """Grow a stack of devices.
233

    
234
    """
235
    cfbd = objects.Disk.FromDict(params[0])
236
    amount = params[1]
237
    return backend.GrowBlockDevice(cfbd, amount)
238

    
239
  @staticmethod
240
  def perspective_blockdev_close(params):
241
    """Closes the given block devices.
242

    
243
    """
244
    disks = [objects.Disk.FromDict(cf) for cf in params]
245
    return backend.CloseBlockDevices(disks)
246

    
247
  # export/import  --------------------------
248

    
249
  @staticmethod
250
  def perspective_snapshot_export(params):
251
    """Export a given snapshot.
252

    
253
    """
254
    disk = objects.Disk.FromDict(params[0])
255
    dest_node = params[1]
256
    instance = objects.Instance.FromDict(params[2])
257
    return backend.ExportSnapshot(disk, dest_node, instance)
258

    
259
  @staticmethod
260
  def perspective_finalize_export(params):
261
    """Expose the finalize export functionality.
262

    
263
    """
264
    instance = objects.Instance.FromDict(params[0])
265
    snap_disks = [objects.Disk.FromDict(str_data)
266
                  for str_data in params[1]]
267
    return backend.FinalizeExport(instance, snap_disks)
268

    
269
  @staticmethod
270
  def perspective_export_info(params):
271
    """Query information about an existing export on this node.
272

    
273
    The given path may not contain an export, in which case we return
274
    None.
275

    
276
    """
277
    path = params[0]
278
    einfo = backend.ExportInfo(path)
279
    if einfo is None:
280
      return einfo
281
    return einfo.Dumps()
282

    
283
  @staticmethod
284
  def perspective_export_list(params):
285
    """List the available exports on this node.
286

    
287
    Note that as opposed to export_info, which may query data about an
288
    export in any path, this only queries the standard Ganeti path
289
    (constants.EXPORT_DIR).
290

    
291
    """
292
    return backend.ListExports()
293

    
294
  @staticmethod
295
  def perspective_export_remove(params):
296
    """Remove an export.
297

    
298
    """
299
    export = params[0]
300
    return backend.RemoveExport(export)
301

    
302
  # volume  --------------------------
303

    
304
  @staticmethod
305
  def perspective_volume_list(params):
306
    """Query the list of logical volumes in a given volume group.
307

    
308
    """
309
    vgname = params[0]
310
    return backend.GetVolumeList(vgname)
311

    
312
  @staticmethod
313
  def perspective_vg_list(params):
314
    """Query the list of volume groups.
315

    
316
    """
317
    return backend.ListVolumeGroups()
318

    
319
  # bridge  --------------------------
320

    
321
  @staticmethod
322
  def perspective_bridges_exist(params):
323
    """Check if all bridges given exist on this node.
324

    
325
    """
326
    bridges_list = params[0]
327
    return backend.BridgesExist(bridges_list)
328

    
329
  # instance  --------------------------
330

    
331
  @staticmethod
332
  def perspective_instance_os_add(params):
333
    """Install an OS on a given instance.
334

    
335
    """
336
    inst_s, os_disk, swap_disk = params
337
    inst = objects.Instance.FromDict(inst_s)
338
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
339

    
340
  @staticmethod
341
  def perspective_instance_run_rename(params):
342
    """Runs the OS rename script for an instance.
343

    
344
    """
345
    inst_s, old_name, os_disk, swap_disk = params
346
    inst = objects.Instance.FromDict(inst_s)
347
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
348

    
349
  @staticmethod
350
  def perspective_instance_os_import(params):
351
    """Run the import function of an OS onto a given instance.
352

    
353
    """
354
    inst_s, os_disk, swap_disk, src_node, src_image = params
355
    inst = objects.Instance.FromDict(inst_s)
356
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
357
                                        src_node, src_image)
358

    
359
  @staticmethod
360
  def perspective_instance_shutdown(params):
361
    """Shutdown an instance.
362

    
363
    """
364
    instance = objects.Instance.FromDict(params[0])
365
    return backend.ShutdownInstance(instance)
366

    
367
  @staticmethod
368
  def perspective_instance_start(params):
369
    """Start an instance.
370

    
371
    """
372
    instance = objects.Instance.FromDict(params[0])
373
    extra_args = params[1]
374
    return backend.StartInstance(instance, extra_args)
375

    
376
  @staticmethod
377
  def perspective_instance_migrate(params):
378
    """Migrates an instance.
379

    
380
    """
381
    instance, target, live = params
382
    return backend.MigrateInstance(instance, target, live)
383

    
384
  @staticmethod
385
  def perspective_instance_reboot(params):
386
    """Reboot an instance.
387

    
388
    """
389
    instance = objects.Instance.FromDict(params[0])
390
    reboot_type = params[1]
391
    extra_args = params[2]
392
    return backend.RebootInstance(instance, reboot_type, extra_args)
393

    
394
  @staticmethod
395
  def perspective_instance_info(params):
396
    """Query instance information.
397

    
398
    """
399
    return backend.GetInstanceInfo(params[0])
400

    
401
  @staticmethod
402
  def perspective_all_instances_info(params):
403
    """Query information about all instances.
404

    
405
    """
406
    return backend.GetAllInstancesInfo()
407

    
408
  @staticmethod
409
  def perspective_instance_list(params):
410
    """Query the list of running instances.
411

    
412
    """
413
    return backend.GetInstanceList()
414

    
415
  # node --------------------------
416

    
417
  @staticmethod
418
  def perspective_node_tcp_ping(params):
419
    """Do a TcpPing on the remote node.
420

    
421
    """
422
    return utils.TcpPing(params[1], params[2], timeout=params[3],
423
                         live_port_needed=params[4], source=params[0])
424

    
425
  @staticmethod
426
  def perspective_node_info(params):
427
    """Query node information.
428

    
429
    """
430
    vgname = params[0]
431
    return backend.GetNodeInfo(vgname)
432

    
433
  @staticmethod
434
  def perspective_node_add(params):
435
    """Complete the registration of this node in the cluster.
436

    
437
    """
438
    return backend.AddNode(params[0], params[1], params[2],
439
                           params[3], params[4], params[5])
440

    
441
  @staticmethod
442
  def perspective_node_verify(params):
443
    """Run a verify sequence on this node.
444

    
445
    """
446
    return backend.VerifyNode(params[0])
447

    
448
  @staticmethod
449
  def perspective_node_start_master(params):
450
    """Promote this node to master status.
451

    
452
    """
453
    return backend.StartMaster()
454

    
455
  @staticmethod
456
  def perspective_node_stop_master(params):
457
    """Demote this node from master status.
458

    
459
    """
460
    return backend.StopMaster()
461

    
462
  @staticmethod
463
  def perspective_node_leave_cluster(params):
464
    """Cleanup after leaving a cluster.
465

    
466
    """
467
    return backend.LeaveCluster()
468

    
469
  @staticmethod
470
  def perspective_node_volumes(params):
471
    """Query the list of all logical volume groups.
472

    
473
    """
474
    return backend.NodeVolumes()
475

    
476
  # cluster --------------------------
477

    
478
  @staticmethod
479
  def perspective_version(params):
480
    """Query version information.
481

    
482
    """
483
    return constants.PROTOCOL_VERSION
484

    
485
  @staticmethod
486
  def perspective_upload_file(params):
487
    """Upload a file.
488

    
489
    Note that the backend implementation imposes strict rules on which
490
    files are accepted.
491

    
492
    """
493
    return backend.UploadFile(*params)
494

    
495

    
496
  # os -----------------------
497

    
498
  @staticmethod
499
  def perspective_os_diagnose(params):
500
    """Query detailed information about existing OSes.
501

    
502
    """
503
    return [os.ToDict() for os in backend.DiagnoseOS()]
504

    
505
  @staticmethod
506
  def perspective_os_get(params):
507
    """Query information about a given OS.
508

    
509
    """
510
    name = params[0]
511
    try:
512
      os_obj = backend.OSFromDisk(name)
513
    except errors.InvalidOS, err:
514
      os_obj = objects.OS.FromInvalidOS(err)
515
    return os_obj.ToDict()
516

    
517
  # hooks -----------------------
518

    
519
  @staticmethod
520
  def perspective_hooks_runner(params):
521
    """Run hook scripts.
522

    
523
    """
524
    hpath, phase, env = params
525
    hr = backend.HooksRunner()
526
    return hr.RunHooks(hpath, phase, env)
527

    
528
  # iallocator -----------------
529

    
530
  @staticmethod
531
  def perspective_iallocator_runner(params):
532
    """Run an iallocator script.
533

    
534
    """
535
    name, idata = params
536
    iar = backend.IAllocatorRunner()
537
    return iar.Run(name, idata)
538

    
539
  # test -----------------------
540

    
541
  @staticmethod
542
  def perspective_test_delay(params):
543
    """Run test delay.
544

    
545
    """
546
    duration = params[0]
547
    return utils.TestDelay(duration)
548

    
549
  @staticmethod
550
  def perspective_file_storage_dir_create(params):
551
    """Create the file storage directory.
552

    
553
    """
554
    file_storage_dir = params[0]
555
    return backend.CreateFileStorageDir(file_storage_dir)
556

    
557
  @staticmethod
558
  def perspective_file_storage_dir_remove(params):
559
    """Remove the file storage directory.
560

    
561
    """
562
    file_storage_dir = params[0]
563
    return backend.RemoveFileStorageDir(file_storage_dir)
564

    
565
  @staticmethod
566
  def perspective_file_storage_dir_rename(params):
567
    """Rename the file storage directory.
568

    
569
    """
570
    old_file_storage_dir = params[0]
571
    new_file_storage_dir = params[1]
572
    return backend.RenameFileStorageDir(old_file_storage_dir,
573
                                        new_file_storage_dir)
574

    
575

    
576
def ParseOptions():
577
  """Parse the command line options.
578

    
579
  Returns:
580
    (options, args) as from OptionParser.parse_args()
581

    
582
  """
583
  parser = OptionParser(description="Ganeti node daemon",
584
                        usage="%prog [-f] [-d]",
585
                        version="%%prog (ganeti) %s" %
586
                        constants.RELEASE_VERSION)
587

    
588
  parser.add_option("-f", "--foreground", dest="fork",
589
                    help="Don't detach from the current terminal",
590
                    default=True, action="store_false")
591
  parser.add_option("-d", "--debug", dest="debug",
592
                    help="Enable some debug messages",
593
                    default=False, action="store_true")
594
  options, args = parser.parse_args()
595
  return options, args
596

    
597

    
598
def main():
599
  """Main function for the node daemon.
600

    
601
  """
602
  options, args = ParseOptions()
603
  utils.debug = options.debug
604
  for fname in (constants.SSL_CERT_FILE,):
605
    if not os.path.isfile(fname):
606
      print "config %s not there, will not run." % fname
607
      sys.exit(5)
608

    
609
  try:
610
    ss = ssconf.SimpleStore()
611
    port = ss.GetNodeDaemonPort()
612
    pwdata = ss.GetNodeDaemonPassword()
613
  except errors.ConfigurationError, err:
614
    print "Cluster configuration incomplete: '%s'" % str(err)
615
    sys.exit(5)
616

    
617
  # create the various SUB_RUN_DIRS, if not existing, so that we handle the
618
  # situation where RUN_DIR is tmpfs
619
  for dir_name in constants.SUB_RUN_DIRS:
620
    if not os.path.exists(dir_name):
621
      try:
622
        os.mkdir(dir_name, 0755)
623
      except EnvironmentError, err:
624
        if err.errno != errno.EEXIST:
625
          print ("Node setup wrong, cannot create directory %s: %s" %
626
                 (dir_name, err))
627
          sys.exit(5)
628
    if not os.path.isdir(dir_name):
629
      print ("Node setup wrong, %s is not a directory" % dir_name)
630
      sys.exit(5)
631

    
632
  # become a daemon
633
  if options.fork:
634
    utils.Daemonize(logfile=constants.LOG_NODESERVER)
635

    
636
  logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
637
                     stderr_logging=not options.fork)
638
  logging.info("ganeti node daemon startup")
639

    
640
  global _EXIT_GANETI_NODED
641

    
642
  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
643
  while (not _EXIT_GANETI_NODED):
644
    httpd.handle_request()
645

    
646

    
647
if __name__ == '__main__':
648
  main()