Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-noded @ b15d625f

History | View | Annotate | Download (15.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti node daemon"""
23

    
24
# functions in this module need to have a given name structure, so:
25
# pylint: disable-msg=C0103
26

    
27
import os
28
import sys
29
import resource
30
import traceback
31
import BaseHTTPServer
32
import simplejson
33

    
34
from optparse import OptionParser
35

    
36

    
37
from ganeti import backend
38
from ganeti import logger
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import errors
42
from ganeti import ssconf
43
from ganeti import utils
44

    
45

    
46
class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler):
47
  """The server implementation.
48

    
49
  This class holds all methods exposed over the RPC interface.
50

    
51
  """
52
  def do_PUT(self):
53
    """Handle a post request.
54

    
55
    """
56
    path = self.path
57
    if path.startswith("/"):
58
      path = path[1:]
59
    mname = "perspective_%s" % path
60
    if not hasattr(self, mname):
61
      self.send_error(404)
62
      return False
63

    
64
    method = getattr(self, mname)
65
    try:
66
      body_length = int(self.headers.get('Content-Length', '0'))
67
    except ValueError:
68
      self.send_error(400, 'No Content-Length header or invalid format')
69
      return False
70

    
71
    try:
72
      body = self.rfile.read(body_length)
73
    except socket.error, err:
74
      logger.Error("Socket error while reading: %s" % str(err))
75
      return
76
    try:
77
      params = simplejson.loads(body)
78
      result = method(params)
79
      payload = simplejson.dumps(result)
80
    except Exception, err:
81
      self.send_error(500, "Error: %s" % str(err))
82
      return False
83
    self.send_response(200)
84
    self.send_header('Content-Length', str(len(payload)))
85
    self.end_headers()
86
    self.wfile.write(payload)
87
    return True
88

    
89
  def log_message(self, format, *args):
90
    """Log a request to the log.
91

    
92
    This is the same as the parent, we just log somewhere else.
93

    
94
    """
95
    msg = ("%s - - [%s] %s\n" %
96
           (self.address_string(),
97
            self.log_date_time_string(),
98
            format % args))
99
    logger.Debug(msg)
100

    
101
  # the new block devices  --------------------------
102

    
103
  @staticmethod
104
  def perspective_blockdev_create(params):
105
    """Create a block device.
106

    
107
    """
108
    bdev_s, size, owner, on_primary, info = params
109
    bdev = objects.Disk.FromDict(bdev_s)
110
    if bdev is None:
111
      raise ValueError("can't unserialize data!")
112
    return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
113

    
114
  @staticmethod
115
  def perspective_blockdev_remove(params):
116
    """Remove a block device.
117

    
118
    """
119
    bdev_s = params[0]
120
    bdev = objects.Disk.FromDict(bdev_s)
121
    return backend.RemoveBlockDevice(bdev)
122

    
123
  @staticmethod
124
  def perspective_blockdev_rename(params):
125
    """Remove a block device.
126

    
127
    """
128
    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
129
    return backend.RenameBlockDevices(devlist)
130

    
131
  @staticmethod
132
  def perspective_blockdev_assemble(params):
133
    """Assemble a block device.
134

    
135
    """
136
    bdev_s, owner, on_primary = params
137
    bdev = objects.Disk.FromDict(bdev_s)
138
    if bdev is None:
139
      raise ValueError("can't unserialize data!")
140
    return backend.AssembleBlockDevice(bdev, owner, on_primary)
141

    
142
  @staticmethod
143
  def perspective_blockdev_shutdown(params):
144
    """Shutdown a block device.
145

    
146
    """
147
    bdev_s = params[0]
148
    bdev = objects.Disk.FromDict(bdev_s)
149
    if bdev is None:
150
      raise ValueError("can't unserialize data!")
151
    return backend.ShutdownBlockDevice(bdev)
152

    
153
  @staticmethod
154
  def perspective_blockdev_addchildren(params):
155
    """Add a child to a mirror device.
156

    
157
    Note: this is only valid for mirror devices. It's the caller's duty
158
    to send a correct disk, otherwise we raise an error.
159

    
160
    """
161
    bdev_s, ndev_s = params
162
    bdev = objects.Disk.FromDict(bdev_s)
163
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
164
    if bdev is None or ndevs.count(None) > 0:
165
      raise ValueError("can't unserialize data!")
166
    return backend.MirrorAddChildren(bdev, ndevs)
167

    
168
  @staticmethod
169
  def perspective_blockdev_removechildren(params):
170
    """Remove a child from a mirror device.
171

    
172
    This is only valid for mirror devices, of course. It's the callers
173
    duty to send a correct disk, otherwise we raise an error.
174

    
175
    """
176
    bdev_s, ndev_s = params
177
    bdev = objects.Disk.FromDict(bdev_s)
178
    ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
179
    if bdev is None or ndevs.count(None) > 0:
180
      raise ValueError("can't unserialize data!")
181
    return backend.MirrorRemoveChildren(bdev, ndevs)
182

    
183
  @staticmethod
184
  def perspective_blockdev_getmirrorstatus(params):
185
    """Return the mirror status for a list of disks.
186

    
187
    """
188
    disks = [objects.Disk.FromDict(dsk_s)
189
            for dsk_s in params]
190
    return backend.GetMirrorStatus(disks)
191

    
192
  @staticmethod
193
  def perspective_blockdev_find(params):
194
    """Expose the FindBlockDevice functionality for a disk.
195

    
196
    This will try to find but not activate a disk.
197

    
198
    """
199
    disk = objects.Disk.FromDict(params[0])
200
    return backend.FindBlockDevice(disk)
201

    
202
  @staticmethod
203
  def perspective_blockdev_snapshot(params):
204
    """Create a snapshot device.
205

    
206
    Note that this is only valid for LVM disks, if we get passed
207
    something else we raise an exception. The snapshot device can be
208
    remove by calling the generic block device remove call.
209

    
210
    """
211
    cfbd = objects.Disk.FromDict(params[0])
212
    return backend.SnapshotBlockDevice(cfbd)
213

    
214
  # export/import  --------------------------
215

    
216
  @staticmethod
217
  def perspective_snapshot_export(params):
218
    """Export a given snapshot.
219

    
220
    """
221
    disk = objects.Disk.FromDict(params[0])
222
    dest_node = params[1]
223
    instance = objects.Instance.FromDict(params[2])
224
    return backend.ExportSnapshot(disk, dest_node, instance)
225

    
226
  @staticmethod
227
  def perspective_finalize_export(params):
228
    """Expose the finalize export functionality.
229

    
230
    """
231
    instance = objects.Instance.FromDict(params[0])
232
    snap_disks = [objects.Disk.FromDict(str_data)
233
                  for str_data in params[1]]
234
    return backend.FinalizeExport(instance, snap_disks)
235

    
236
  @staticmethod
237
  def perspective_export_info(params):
238
    """Query information about an existing export on this node.
239

    
240
    The given path may not contain an export, in which case we return
241
    None.
242

    
243
    """
244
    path = params[0]
245
    einfo = backend.ExportInfo(path)
246
    if einfo is None:
247
      return einfo
248
    return einfo.Dumps()
249

    
250
  @staticmethod
251
  def perspective_export_list(params):
252
    """List the available exports on this node.
253

    
254
    Note that as opposed to export_info, which may query data about an
255
    export in any path, this only queries the standard Ganeti path
256
    (constants.EXPORT_DIR).
257

    
258
    """
259
    return backend.ListExports()
260

    
261
  @staticmethod
262
  def perspective_export_remove(params):
263
    """Remove an export.
264

    
265
    """
266
    export = params[0]
267
    return backend.RemoveExport(export)
268

    
269
  # volume  --------------------------
270

    
271
  @staticmethod
272
  def perspective_volume_list(params):
273
    """Query the list of logical volumes in a given volume group.
274

    
275
    """
276
    vgname = params[0]
277
    return backend.GetVolumeList(vgname)
278

    
279
  @staticmethod
280
  def perspective_vg_list(params):
281
    """Query the list of volume groups.
282

    
283
    """
284
    return backend.ListVolumeGroups()
285

    
286
  # bridge  --------------------------
287

    
288
  @staticmethod
289
  def perspective_bridges_exist(params):
290
    """Check if all bridges given exist on this node.
291

    
292
    """
293
    bridges_list = params[0]
294
    return backend.BridgesExist(bridges_list)
295

    
296
  # instance  --------------------------
297

    
298
  @staticmethod
299
  def perspective_instance_os_add(params):
300
    """Install an OS on a given instance.
301

    
302
    """
303
    inst_s, os_disk, swap_disk = params
304
    inst = objects.Instance.FromDict(inst_s)
305
    return backend.AddOSToInstance(inst, os_disk, swap_disk)
306

    
307
  @staticmethod
308
  def perspective_instance_run_rename(params):
309
    """Runs the OS rename script for an instance.
310

    
311
    """
312
    inst_s, old_name, os_disk, swap_disk = params
313
    inst = objects.Instance.FromDict(inst_s)
314
    return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
315

    
316
  @staticmethod
317
  def perspective_instance_os_import(params):
318
    """Run the import function of an OS onto a given instance.
319

    
320
    """
321
    inst_s, os_disk, swap_disk, src_node, src_image = params
322
    inst = objects.Instance.FromDict(inst_s)
323
    return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
324
                                        src_node, src_image)
325

    
326
  @staticmethod
327
  def perspective_instance_shutdown(params):
328
    """Shutdown an instance.
329

    
330
    """
331
    instance = objects.Instance.FromDict(params[0])
332
    return backend.ShutdownInstance(instance)
333

    
334
  @staticmethod
335
  def perspective_instance_start(params):
336
    """Start an instance.
337

    
338
    """
339
    instance = objects.Instance.FromDict(params[0])
340
    extra_args = params[1]
341
    return backend.StartInstance(instance, extra_args)
342

    
343
  @staticmethod
344
  def perspective_instance_reboot(params):
345
    """Reboot an instance.
346

    
347
    """
348
    instance = objects.Instance.FromDict(params[0])
349
    reboot_type = params[1]
350
    extra_args = params[2]
351
    return backend.RebootInstance(instance, reboot_type, extra_args)
352

    
353
  @staticmethod
354
  def perspective_instance_info(params):
355
    """Query instance information.
356

    
357
    """
358
    return backend.GetInstanceInfo(params[0])
359

    
360
  @staticmethod
361
  def perspective_all_instances_info(params):
362
    """Query information about all instances.
363

    
364
    """
365
    return backend.GetAllInstancesInfo()
366

    
367
  @staticmethod
368
  def perspective_instance_list(params):
369
    """Query the list of running instances.
370

    
371
    """
372
    return backend.GetInstanceList()
373

    
374
  # node --------------------------
375

    
376
  @staticmethod
377
  def perspective_node_tcp_ping(params):
378
    """Do a TcpPing on the remote node.
379

    
380
    """
381
    return utils.TcpPing(params[1], params[2], timeout=params[3],
382
                         live_port_needed=params[4], source=params[0])
383

    
384
  @staticmethod
385
  def perspective_node_info(params):
386
    """Query node information.
387

    
388
    """
389
    vgname = params[0]
390
    return backend.GetNodeInfo(vgname)
391

    
392
  @staticmethod
393
  def perspective_node_add(params):
394
    """Complete the registration of this node in the cluster.
395

    
396
    """
397
    return backend.AddNode(params[0], params[1], params[2],
398
                           params[3], params[4], params[5])
399

    
400
  @staticmethod
401
  def perspective_node_verify(params):
402
    """Run a verify sequence on this node.
403

    
404
    """
405
    return backend.VerifyNode(params[0])
406

    
407
  @staticmethod
408
  def perspective_node_start_master(params):
409
    """Promote this node to master status.
410

    
411
    """
412
    return backend.StartMaster()
413

    
414
  @staticmethod
415
  def perspective_node_stop_master(params):
416
    """Demote this node from master status.
417

    
418
    """
419
    return backend.StopMaster()
420

    
421
  @staticmethod
422
  def perspective_node_leave_cluster(params):
423
    """Cleanup after leaving a cluster.
424

    
425
    """
426
    return backend.LeaveCluster()
427

    
428
  @staticmethod
429
  def perspective_node_volumes(params):
430
    """Query the list of all logical volume groups.
431

    
432
    """
433
    return backend.NodeVolumes()
434

    
435
  # cluster --------------------------
436

    
437
  @staticmethod
438
  def perspective_version(params):
439
    """Query version information.
440

    
441
    """
442
    return constants.PROTOCOL_VERSION
443

    
444
  @staticmethod
445
  def perspective_upload_file(params):
446
    """Upload a file.
447

    
448
    Note that the backend implementation imposes strict rules on which
449
    files are accepted.
450

    
451
    """
452
    return backend.UploadFile(*params)
453

    
454

    
455
  # os -----------------------
456

    
457
  @staticmethod
458
  def perspective_os_diagnose(params):
459
    """Query detailed information about existing OSes.
460

    
461
    """
462
    return [os.ToDict() for os in backend.DiagnoseOS()]
463

    
464
  @staticmethod
465
  def perspective_os_get(params):
466
    """Query information about a given OS.
467

    
468
    """
469
    name = params[0]
470
    try:
471
      os_obj = backend.OSFromDisk(name)
472
    except errors.InvalidOS, err:
473
      os_obj = objects.OS.FromInvalidOS(err)
474
    return os_obj.ToDict()
475

    
476
  # hooks -----------------------
477

    
478
  @staticmethod
479
  def perspective_hooks_runner(params):
480
    """Run hook scripts.
481

    
482
    """
483
    hpath, phase, env = params
484
    hr = backend.HooksRunner()
485
    return hr.RunHooks(hpath, phase, env)
486

    
487
  # test -----------------------
488

    
489
  @staticmethod
490
  def perspective_test_delay(params):
491
    """Run test delay.
492

    
493
    """
494
    duration = params[0]
495
    return utils.TestDelay(duration)
496

    
497

    
498
def ParseOptions():
499
  """Parse the command line options.
500

    
501
  Returns:
502
    (options, args) as from OptionParser.parse_args()
503

    
504
  """
505
  parser = OptionParser(description="Ganeti node daemon",
506
                        usage="%prog [-f] [-d]",
507
                        version="%%prog (ganeti) %s" %
508
                        constants.RELEASE_VERSION)
509

    
510
  parser.add_option("-f", "--foreground", dest="fork",
511
                    help="Don't detach from the current terminal",
512
                    default=True, action="store_false")
513
  parser.add_option("-d", "--debug", dest="debug",
514
                    help="Enable some debug messages",
515
                    default=False, action="store_true")
516
  options, args = parser.parse_args()
517
  return options, args
518

    
519

    
520
def main():
521
  """Main function for the node daemon.
522

    
523
  """
524
  options, args = ParseOptions()
525
  utils.debug = options.debug
526
  for fname in (constants.SSL_CERT_FILE,):
527
    if not os.path.isfile(fname):
528
      print "config %s not there, will not run." % fname
529
      sys.exit(5)
530

    
531
  try:
532
    ss = ssconf.SimpleStore()
533
    port = ss.GetNodeDaemonPort()
534
    pwdata = ss.GetNodeDaemonPassword()
535
  except errors.ConfigurationError, err:
536
    print "Cluster configuration incomplete: '%s'" % str(err)
537
    sys.exit(5)
538

    
539
  # become a daemon
540
  if options.fork:
541
    createDaemon()
542

    
543
  logger.SetupLogging(twisted_workaround=True, debug=options.debug,
544
                      program="ganeti-noded")
545

    
546
  httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject)
547
  httpd.serve_forever()
548

    
549

    
550
def createDaemon():
551
  """Detach a process from the controlling terminal and run it in the
552
  background as a daemon.
553

    
554
  """
555
  UMASK = 077
556
  WORKDIR = "/"
557
  # Default maximum for the number of available file descriptors.
558
  if 'SC_OPEN_MAX' in os.sysconf_names:
559
    try:
560
      MAXFD = os.sysconf('SC_OPEN_MAX')
561
      if MAXFD < 0:
562
        MAXFD = 1024
563
    except OSError:
564
      MAXFD = 1024
565
  else:
566
    MAXFD = 1024
567
  # The standard I/O file descriptors are redirected to /dev/null by default.
568
  #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
569
  REDIRECT_TO = constants.LOG_NODESERVER
570
  try:
571
    pid = os.fork()
572
  except OSError, e:
573
    raise Exception("%s [%d]" % (e.strerror, e.errno))
574
  if (pid == 0):  # The first child.
575
    os.setsid()
576
    try:
577
      pid = os.fork() # Fork a second child.
578
    except OSError, e:
579
      raise Exception("%s [%d]" % (e.strerror, e.errno))
580
    if (pid == 0):  # The second child.
581
      os.chdir(WORKDIR)
582
      os.umask(UMASK)
583
    else:
584
      # exit() or _exit()?  See below.
585
      os._exit(0) # Exit parent (the first child) of the second child.
586
  else:
587
    os._exit(0) # Exit parent of the first child.
588
  maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
589
  if (maxfd == resource.RLIM_INFINITY):
590
    maxfd = MAXFD
591

    
592
  # Iterate through and close all file descriptors.
593
  for fd in range(0, maxfd):
594
    try:
595
      os.close(fd)
596
    except OSError: # ERROR, fd wasn't open to begin with (ignored)
597
      pass
598
  os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
599
  # Duplicate standard input to standard output and standard error.
600
  os.dup2(0, 1)     # standard output (1)
601
  os.dup2(0, 2)     # standard error (2)
602
  return(0)
603

    
604

    
605
if __name__ == '__main__':
606
  main()