Statistics
| Branch: | Tag: | Revision:

root / lib / rpc.py @ cdb08f44

History | View | Annotate | Download (15.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script to show add a new node to the cluster
23

24
"""
25

    
26
# pylint: disable-msg=C0103
27

    
28
import os
29
import socket
30
import httplib
31

    
32
import simplejson
33

    
34
from ganeti import logger
35
from ganeti import utils
36
from ganeti import errors
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40

    
41
class NodeController:
42
  """Node-handling class.
43

44
  For each node that we speak with, we create an instance of this
45
  class, so that we have a safe place to store the details of this
46
  individual call.
47

48
  """
49
  def __init__(self, parent, node):
50
    self.parent = parent
51
    self.node = node
52
    self.failed = False
53

    
54
    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
55
    try:
56
      hc.connect()
57
      hc.putrequest('PUT', "/%s" % self.parent.procedure,
58
                    skip_accept_encoding=True)
59
      hc.putheader('Content-Length', str(len(parent.body)))
60
      hc.endheaders()
61
      hc.send(parent.body)
62
    except socket.error, err:
63
      logger.Error("Error connecting to %s: %s" % (node, str(err)))
64
      self.failed = True
65

    
66
  def get_response(self):
67
    """Try to process the response from the node.
68

69
    """
70
    if self.failed:
71
      # we already failed in connect
72
      return False
73
    resp = self.http_conn.getresponse()
74
    if resp.status != 200:
75
      return False
76
    try:
77
      length = int(resp.getheader('Content-Length', '0'))
78
    except ValueError:
79
      return False
80
    if not length:
81
      logger.Error("Zero-length reply from %s" % self.node)
82
      return False
83
    payload = resp.read(length)
84
    unload = simplejson.loads(payload)
85
    return unload
86

    
87

    
88
class Client:
89
  """RPC Client class.
90

91
  This class, given a (remote) method name, a list of parameters and a
92
  list of nodes, will contact (in parallel) all nodes, and return a
93
  dict of results (key: node name, value: result).
94

95
  One current bug is that generic failure is still signalled by
96
  'False' result, which is not good. This overloading of values can
97
  cause bugs.
98

99
  """
100
  result_set = False
101
  result = False
102
  allresult = []
103

    
104
  def __init__(self, procedure, args):
105
    ss = ssconf.SimpleStore()
106
    self.port = ss.GetNodeDaemonPort()
107
    self.nodepw = ss.GetNodeDaemonPassword()
108
    self.nc = {}
109
    self.results = {}
110
    self.procedure = procedure
111
    self.args = args
112
    self.body = simplejson.dumps(args)
113

    
114
  #--- generic connector -------------
115

    
116
  def connect_list(self, node_list):
117
    """Add a list of nodes to the target nodes.
118

119
    """
120
    for node in node_list:
121
      self.connect(node)
122

    
123
  def connect(self, connect_node):
124
    """Add a node to the target list.
125

126
    """
127
    self.nc[connect_node] = nc = NodeController(self, connect_node)
128

    
129
  def getresult(self):
130
    """Return the results of the call.
131

132
    """
133
    return self.results
134

    
135
  def run(self):
136
    """Wrapper over reactor.run().
137

138
    This function simply calls reactor.run() if we have any requests
139
    queued, otherwise it does nothing.
140

141
    """
142
    for node, nc in self.nc.items():
143
      self.results[node] = nc.get_response()
144

    
145

    
146
def call_volume_list(node_list, vg_name):
147
  """Gets the logical volumes present in a given volume group.
148

149
  This is a multi-node call.
150

151
  """
152
  c = Client("volume_list", [vg_name])
153
  c.connect_list(node_list)
154
  c.run()
155
  return c.getresult()
156

    
157

    
158
def call_vg_list(node_list):
159
  """Gets the volume group list.
160

161
  This is a multi-node call.
162

163
  """
164
  c = Client("vg_list", [])
165
  c.connect_list(node_list)
166
  c.run()
167
  return c.getresult()
168

    
169

    
170
def call_bridges_exist(node, bridges_list):
171
  """Checks if a node has all the bridges given.
172

173
  This method checks if all bridges given in the bridges_list are
174
  present on the remote node, so that an instance that uses interfaces
175
  on those bridges can be started.
176

177
  This is a single-node call.
178

179
  """
180
  c = Client("bridges_exist", [bridges_list])
181
  c.connect(node)
182
  c.run()
183
  return c.getresult().get(node, False)
184

    
185

    
186
def call_instance_start(node, instance, extra_args):
187
  """Starts an instance.
188

189
  This is a single-node call.
190

191
  """
192
  c = Client("instance_start", [instance.ToDict(), extra_args])
193
  c.connect(node)
194
  c.run()
195
  return c.getresult().get(node, False)
196

    
197

    
198
def call_instance_shutdown(node, instance):
199
  """Stops an instance.
200

201
  This is a single-node call.
202

203
  """
204
  c = Client("instance_shutdown", [instance.ToDict()])
205
  c.connect(node)
206
  c.run()
207
  return c.getresult().get(node, False)
208

    
209

    
210
def call_instance_reboot(node, instance, reboot_type, extra_args):
211
  """Reboots an instance.
212

213
  This is a single-node call.
214

215
  """
216
  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
217
  c.connect(node)
218
  c.run()
219
  return c.getresult().get(node, False)
220

    
221

    
222
def call_instance_os_add(node, inst, osdev, swapdev):
223
  """Installs an OS on the given instance.
224

225
  This is a single-node call.
226

227
  """
228
  params = [inst.ToDict(), osdev, swapdev]
229
  c = Client("instance_os_add", params)
230
  c.connect(node)
231
  c.run()
232
  return c.getresult().get(node, False)
233

    
234

    
235
def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
236
  """Run the OS rename script for an instance.
237

238
  This is a single-node call.
239

240
  """
241
  params = [inst.ToDict(), old_name, osdev, swapdev]
242
  c = Client("instance_run_rename", params)
243
  c.connect(node)
244
  c.run()
245
  return c.getresult().get(node, False)
246

    
247

    
248
def call_instance_info(node, instance):
249
  """Returns information about a single instance.
250

251
  This is a single-node call.
252

253
  """
254
  c = Client("instance_info", [instance])
255
  c.connect(node)
256
  c.run()
257
  return c.getresult().get(node, False)
258

    
259

    
260
def call_all_instances_info(node_list):
261
  """Returns information about all instances on a given node.
262

263
  This is a single-node call.
264

265
  """
266
  c = Client("all_instances_info", [])
267
  c.connect_list(node_list)
268
  c.run()
269
  return c.getresult()
270

    
271

    
272
def call_instance_list(node_list):
273
  """Returns the list of running instances on a given node.
274

275
  This is a single-node call.
276

277
  """
278
  c = Client("instance_list", [])
279
  c.connect_list(node_list)
280
  c.run()
281
  return c.getresult()
282

    
283

    
284
def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
285
  """Do a TcpPing on the remote node
286

287
  This is a single-node call.
288
  """
289
  c = Client("node_tcp_ping", [source, target, port, timeout,
290
                               live_port_needed])
291
  c.connect(node)
292
  c.run()
293
  return c.getresult().get(node, False)
294

    
295

    
296
def call_node_info(node_list, vg_name):
297
  """Return node information.
298

299
  This will return memory information and volume group size and free
300
  space.
301

302
  This is a multi-node call.
303

304
  """
305
  c = Client("node_info", [vg_name])
306
  c.connect_list(node_list)
307
  c.run()
308
  retux = c.getresult()
309

    
310
  for node_name in retux:
311
    ret = retux.get(node_name, False)
312
    if type(ret) != dict:
313
      logger.Error("could not connect to node %s" % (node_name))
314
      ret = {}
315

    
316
    utils.CheckDict(ret,
317
                    { 'memory_total' : '-',
318
                      'memory_dom0' : '-',
319
                      'memory_free' : '-',
320
                      'vg_size' : 'node_unreachable',
321
                      'vg_free' : '-' },
322
                    "call_node_info",
323
                    )
324
  return retux
325

    
326

    
327
def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
328
  """Add a node to the cluster.
329

330
  This is a single-node call.
331

332
  """
333
  params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
334
  c = Client("node_add", params)
335
  c.connect(node)
336
  c.run()
337
  return c.getresult().get(node, False)
338

    
339

    
340
def call_node_verify(node_list, checkdict):
341
  """Request verification of given parameters.
342

343
  This is a multi-node call.
344

345
  """
346
  c = Client("node_verify", [checkdict])
347
  c.connect_list(node_list)
348
  c.run()
349
  return c.getresult()
350

    
351

    
352
def call_node_start_master(node):
353
  """Tells a node to activate itself as a master.
354

355
  This is a single-node call.
356

357
  """
358
  c = Client("node_start_master", [])
359
  c.connect(node)
360
  c.run()
361
  return c.getresult().get(node, False)
362

    
363

    
364
def call_node_stop_master(node):
365
  """Tells a node to demote itself from master status.
366

367
  This is a single-node call.
368

369
  """
370
  c = Client("node_stop_master", [])
371
  c.connect(node)
372
  c.run()
373
  return c.getresult().get(node, False)
374

    
375

    
376
def call_version(node_list):
377
  """Query node version.
378

379
  This is a multi-node call.
380

381
  """
382
  c = Client("version", [])
383
  c.connect_list(node_list)
384
  c.run()
385
  return c.getresult()
386

    
387

    
388
def call_blockdev_create(node, bdev, size, owner, on_primary, info):
389
  """Request creation of a given block device.
390

391
  This is a single-node call.
392

393
  """
394
  params = [bdev.ToDict(), size, owner, on_primary, info]
395
  c = Client("blockdev_create", params)
396
  c.connect(node)
397
  c.run()
398
  return c.getresult().get(node, False)
399

    
400

    
401
def call_blockdev_remove(node, bdev):
402
  """Request removal of a given block device.
403

404
  This is a single-node call.
405

406
  """
407
  c = Client("blockdev_remove", [bdev.ToDict()])
408
  c.connect(node)
409
  c.run()
410
  return c.getresult().get(node, False)
411

    
412

    
413
def call_blockdev_rename(node, devlist):
414
  """Request rename of the given block devices.
415

416
  This is a single-node call.
417

418
  """
419
  params = [(d.ToDict(), uid) for d, uid in devlist]
420
  c = Client("blockdev_rename", params)
421
  c.connect(node)
422
  c.run()
423
  return c.getresult().get(node, False)
424

    
425

    
426
def call_blockdev_assemble(node, disk, owner, on_primary):
427
  """Request assembling of a given block device.
428

429
  This is a single-node call.
430

431
  """
432
  params = [disk.ToDict(), owner, on_primary]
433
  c = Client("blockdev_assemble", params)
434
  c.connect(node)
435
  c.run()
436
  return c.getresult().get(node, False)
437

    
438

    
439
def call_blockdev_shutdown(node, disk):
440
  """Request shutdown of a given block device.
441

442
  This is a single-node call.
443

444
  """
445
  c = Client("blockdev_shutdown", [disk.ToDict()])
446
  c.connect(node)
447
  c.run()
448
  return c.getresult().get(node, False)
449

    
450

    
451
def call_blockdev_addchildren(node, bdev, ndevs):
452
  """Request adding a list of children to a (mirroring) device.
453

454
  This is a single-node call.
455

456
  """
457
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
458
  c = Client("blockdev_addchildren", params)
459
  c.connect(node)
460
  c.run()
461
  return c.getresult().get(node, False)
462

    
463

    
464
def call_blockdev_removechildren(node, bdev, ndevs):
465
  """Request removing a list of children from a (mirroring) device.
466

467
  This is a single-node call.
468

469
  """
470
  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
471
  c = Client("blockdev_removechildren", params)
472
  c.connect(node)
473
  c.run()
474
  return c.getresult().get(node, False)
475

    
476

    
477
def call_blockdev_getmirrorstatus(node, disks):
478
  """Request status of a (mirroring) device.
479

480
  This is a single-node call.
481

482
  """
483
  params = [dsk.ToDict() for dsk in disks]
484
  c = Client("blockdev_getmirrorstatus", params)
485
  c.connect(node)
486
  c.run()
487
  return c.getresult().get(node, False)
488

    
489

    
490
def call_blockdev_find(node, disk):
491
  """Request identification of a given block device.
492

493
  This is a single-node call.
494

495
  """
496
  c = Client("blockdev_find", [disk.ToDict()])
497
  c.connect(node)
498
  c.run()
499
  return c.getresult().get(node, False)
500

    
501

    
502
def call_upload_file(node_list, file_name):
503
  """Upload a file.
504

505
  The node will refuse the operation in case the file is not on the
506
  approved file list.
507

508
  This is a multi-node call.
509

510
  """
511
  fh = file(file_name)
512
  try:
513
    data = fh.read()
514
  finally:
515
    fh.close()
516
  st = os.stat(file_name)
517
  params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
518
            st.st_atime, st.st_mtime]
519
  c = Client("upload_file", params)
520
  c.connect_list(node_list)
521
  c.run()
522
  return c.getresult()
523

    
524

    
525
def call_os_diagnose(node_list):
526
  """Request a diagnose of OS definitions.
527

528
  This is a multi-node call.
529

530
  """
531
  c = Client("os_diagnose", [])
532
  c.connect_list(node_list)
533
  c.run()
534
  result = c.getresult()
535
  new_result = {}
536
  for node_name in result:
537
    if result[node_name]:
538
      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
539
    else:
540
      nr = []
541
    new_result[node_name] = nr
542
  return new_result
543

    
544

    
545
def call_os_get(node, name):
546
  """Returns an OS definition.
547

548
  This is a single-node call.
549

550
  """
551
  c = Client("os_get", [name])
552
  c.connect(node)
553
  c.run()
554
  result = c.getresult().get(node, False)
555
  if isinstance(result, dict):
556
    return objects.OS.FromDict(result)
557
  else:
558
    return result
559

    
560

    
561
def call_hooks_runner(node_list, hpath, phase, env):
562
  """Call the hooks runner.
563

564
  Args:
565
    - op: the OpCode instance
566
    - env: a dictionary with the environment
567

568
  This is a multi-node call.
569

570
  """
571
  params = [hpath, phase, env]
572
  c = Client("hooks_runner", params)
573
  c.connect_list(node_list)
574
  c.run()
575
  result = c.getresult()
576
  return result
577

    
578

    
579
def call_blockdev_snapshot(node, cf_bdev):
580
  """Request a snapshot of the given block device.
581

582
  This is a single-node call.
583

584
  """
585
  c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
586
  c.connect(node)
587
  c.run()
588
  return c.getresult().get(node, False)
589

    
590

    
591
def call_snapshot_export(node, snap_bdev, dest_node, instance):
592
  """Request the export of a given snapshot.
593

594
  This is a single-node call.
595

596
  """
597
  params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
598
  c = Client("snapshot_export", params)
599
  c.connect(node)
600
  c.run()
601
  return c.getresult().get(node, False)
602

    
603

    
604
def call_finalize_export(node, instance, snap_disks):
605
  """Request the completion of an export operation.
606

607
  This writes the export config file, etc.
608

609
  This is a single-node call.
610

611
  """
612
  flat_disks = []
613
  for disk in snap_disks:
614
    flat_disks.append(disk.ToDict())
615
  params = [instance.ToDict(), flat_disks]
616
  c = Client("finalize_export", params)
617
  c.connect(node)
618
  c.run()
619
  return c.getresult().get(node, False)
620

    
621

    
622
def call_export_info(node, path):
623
  """Queries the export information in a given path.
624

625
  This is a single-node call.
626

627
  """
628
  c = Client("export_info", [path])
629
  c.connect(node)
630
  c.run()
631
  result = c.getresult().get(node, False)
632
  if not result:
633
    return result
634
  return objects.SerializableConfigParser.Loads(result)
635

    
636

    
637
def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
638
  """Request the import of a backup into an instance.
639

640
  This is a single-node call.
641

642
  """
643
  params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
644
  c = Client("instance_os_import", params)
645
  c.connect(node)
646
  c.run()
647
  return c.getresult().get(node, False)
648

    
649

    
650
def call_export_list(node_list):
651
  """Gets the stored exports list.
652

653
  This is a multi-node call.
654

655
  """
656
  c = Client("export_list", [])
657
  c.connect_list(node_list)
658
  c.run()
659
  result = c.getresult()
660
  return result
661

    
662

    
663
def call_export_remove(node, export):
664
  """Requests removal of a given export.
665

666
  This is a single-node call.
667

668
  """
669
  c = Client("export_remove", [export])
670
  c.connect(node)
671
  c.run()
672
  return c.getresult().get(node, False)
673

    
674

    
675
def call_node_leave_cluster(node):
676
  """Requests a node to clean the cluster information it has.
677

678
  This will remove the configuration information from the ganeti data
679
  dir.
680

681
  This is a single-node call.
682

683
  """
684
  c = Client("node_leave_cluster", [])
685
  c.connect(node)
686
  c.run()
687
  return c.getresult().get(node, False)
688

    
689

    
690
def call_node_volumes(node_list):
691
  """Gets all volumes on node(s).
692

693
  This is a multi-node call.
694

695
  """
696
  c = Client("node_volumes", [])
697
  c.connect_list(node_list)
698
  c.run()
699
  return c.getresult()
700

    
701

    
702
def call_test_delay(node_list, duration):
703
  """Sleep for a fixed time on given node(s).
704

705
  This is a multi-node call.
706

707
  """
708
  c = Client("test_delay", [duration])
709
  c.connect_list(node_list)
710
  c.run()
711
  return c.getresult()