Statistics
| Branch: | Tag: | Revision:

root / lib / rapi / client.py @ f2f88abf

History | View | Annotate | Download (22.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti RAPI client."""
23

    
24
import httplib
25
import httplib2
26
import simplejson
27
import socket
28
import urllib
29
from OpenSSL import SSL
30
from OpenSSL import crypto
31

    
32

    
33
HTTP_DELETE = "DELETE"
34
HTTP_GET = "GET"
35
HTTP_PUT = "PUT"
36
HTTP_POST = "POST"
37
REPLACE_DISK_PRI = "replace_on_primary"
38
REPLACE_DISK_SECONDARY = "replace_on_secondary"
39
REPLACE_DISK_CHG = "replace_new_secondary"
40
REPLACE_DISK_AUTO = "replace_auto"
41
VALID_REPLACEMENT_MODES = frozenset([
42
    REPLACE_DISK_PRI, REPLACE_DISK_SECONDARY, REPLACE_DISK_CHG,
43
    REPLACE_DISK_AUTO
44
    ])
45
VALID_NODE_ROLES = frozenset([
46
    "drained", "master", "master-candidate", "offline", "regular"
47
    ])
48
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
49

    
50

    
51
class Error(Exception):
52
  """Base error class for this module.
53

54
  """
55
  pass
56

    
57

    
58
class CertificateError(Error):
59
  """Raised when a problem is found with the SSL certificate.
60

61
  """
62
  pass
63

    
64

    
65
class GanetiApiError(Error):
66
  """Generic error raised from Ganeti API.
67

68
  """
69
  pass
70

    
71

    
72
class InvalidReplacementMode(Error):
73
  """Raised when an invalid disk replacement mode is attempted.
74

75
  """
76
  pass
77

    
78

    
79
class InvalidStorageType(Error):
80
  """Raised when an invalid storage type is used.
81

82
  """
83
  pass
84

    
85

    
86
class InvalidNodeRole(Error):
87
  """Raised when an invalid node role is used.
88

89
  """
90
  pass
91

    
92

    
93
class GanetiRapiClient(object):
94
  """Ganeti RAPI client.
95

96
  """
97

    
98
  USER_AGENT = "Ganeti RAPI Client"
99

    
100
  def __init__(self, master_hostname, port=5080, username=None, password=None,
101
               ssl_cert_file=None):
102
    """Constructor.
103

104
    @type master_hostname: str
105
    @param master_hostname: the ganeti cluster master to interact with
106
    @type port: int
107
    @param port: the port on which the RAPI is running. (default is 5080)
108
    @type username: str
109
    @param username: the username to connect with
110
    @type password: str
111
    @param password: the password to connect with
112
    @type ssl_cert_file: str or None
113
    @param ssl_cert_file: path to the expected SSL certificate. if None, SSL
114
        certificate will not be verified
115

116
    """
117
    self._master_hostname = master_hostname
118
    self._port = port
119

    
120
    self._version = None
121
    self._http = httplib2.Http()
122

    
123
    # Older versions of httplib2 don't support the connection_type argument
124
    # to request(), so we have to manually specify the connection object in the
125
    # internal dict.
126
    base_url = self._MakeUrl("/", prepend_version=False)
127
    scheme, authority, _, _, _ = httplib2.parse_uri(base_url)
128
    conn_key = "%s:%s" % (scheme, authority)
129
    self._http.connections[conn_key] = \
130
      HTTPSConnectionOpenSSL(master_hostname, port, cert_file=ssl_cert_file)
131

    
132
    self._headers = {
133
        "Accept": "text/plain",
134
        "Content-type": "application/x-www-form-urlencoded",
135
        "User-Agent": self.USER_AGENT}
136

    
137
    if username is not None and password is not None:
138
      self._http.add_credentials(username, password)
139

    
140
  def _MakeUrl(self, path, query=None, prepend_version=True):
141
    """Constructs the URL to pass to the HTTP client.
142

143
    @type path: str
144
    @param path: HTTP URL path
145
    @type query: list of two-tuples
146
    @param query: query arguments to pass to urllib.urlencode
147
    @type prepend_version: bool
148
    @param prepend_version: whether to automatically fetch and prepend the
149
        Ganeti RAPI version to the URL path
150

151
    @rtype:  str
152
    @return: URL path
153

154
    """
155
    if prepend_version:
156
      path = "/%d%s" % (self.GetVersion(), path)
157

    
158
    return "https://%(host)s:%(port)d%(path)s?%(query)s" % {
159
        "host": self._master_hostname,
160
        "port": self._port,
161
        "path": path,
162
        "query": urllib.urlencode(query or [])}
163

    
164
  def _SendRequest(self, method, path, query=None, content=None,
165
                   prepend_version=True):
166
    """Sends an HTTP request.
167

168
    This constructs a full URL, encodes and decodes HTTP bodies, and
169
    handles invalid responses in a pythonic way.
170

171
    @type method: str
172
    @param method: HTTP method to use
173
    @type path: str
174
    @param path: HTTP URL path
175
    @type query: list of two-tuples
176
    @param query: query arguments to pass to urllib.urlencode
177
    @type content: str or None
178
    @param content: HTTP body content
179
    @type prepend_version: bool
180
    @param prepend_version: whether to automatically fetch and prepend the
181
        Ganeti RAPI version to the URL path
182

183
    @rtype: str
184
    @return: JSON-Decoded response
185

186
    @raises CertificateError: If an invalid SSL certificate is found
187
    @raises GanetiApiError: If an invalid response is returned
188

189
    """
190
    if content:
191
      content = simplejson.JSONEncoder(sort_keys=True).encode(content)
192

    
193
    url = self._MakeUrl(path, query, prepend_version)
194
    try:
195
      resp_headers, resp_content = self._http.request(url, method,
196
          body=content, headers=self._headers)
197
    except (crypto.Error, SSL.Error):
198
      raise CertificateError("Invalid SSL certificate.")
199

    
200
    if resp_content:
201
      resp_content = simplejson.loads(resp_content)
202

    
203
    # TODO: Are there other status codes that are valid? (redirect?)
204
    if resp_headers.status != 200:
205
      if isinstance(resp_content, dict):
206
        msg = ("%s %s: %s" %
207
            (resp_content["code"], resp_content["message"],
208
             resp_content["explain"]))
209
      else:
210
        msg = resp_content
211
      raise GanetiApiError(msg)
212

    
213
    return resp_content
214

    
215
  def GetVersion(self):
216
    """Gets the Remote API version running on the cluster.
217

218
    @rtype: int
219
    @return: Ganeti Remote API version
220

221
    """
222
    if self._version is None:
223
      self._version = self._SendRequest(HTTP_GET, "/version",
224
                                        prepend_version=False)
225
    return self._version
226

    
227
  def GetOperatingSystems(self):
228
    """Gets the Operating Systems running in the Ganeti cluster.
229

230
    @rtype: list of str
231
    @return: operating systems
232

233
    """
234
    return self._SendRequest(HTTP_GET, "/os")
235

    
236
  def GetInfo(self):
237
    """Gets info about the cluster.
238

239
    @rtype: dict
240
    @return: information about the cluster
241

242
    """
243
    return self._SendRequest(HTTP_GET, "/info")
244

    
245
  def GetClusterTags(self):
246
    """Gets the cluster tags.
247

248
    @rtype: list of str
249
    @return: cluster tags
250

251
    """
252
    return self._SendRequest(HTTP_GET, "/tags")
253

    
254
  def AddClusterTags(self, tags, dry_run=False):
255
    """Adds tags to the cluster.
256

257
    @type tags: list of str
258
    @param tags: tags to add to the cluster
259
    @type dry_run: bool
260
    @param dry_run: whether to perform a dry run
261

262
    @rtype: int
263
    @return: job id
264

265
    """
266
    query = [("tag", t) for t in tags]
267
    if dry_run:
268
      query.append(("dry-run", 1))
269

    
270
    return self._SendRequest(HTTP_PUT, "/tags", query)
271

    
272
  def DeleteClusterTags(self, tags, dry_run=False):
273
    """Deletes tags from the cluster.
274

275
    @type tags: list of str
276
    @param tags: tags to delete
277
    @type dry_run: bool
278
    @param dry_run: whether to perform a dry run
279

280
    """
281
    query = [("tag", t) for t in tags]
282
    if dry_run:
283
      query.append(("dry-run", 1))
284

    
285
    self._SendRequest(HTTP_DELETE, "/tags", query)
286

    
287
  def GetInstances(self, bulk=False):
288
    """Gets information about instances on the cluster.
289

290
    @type bulk: bool
291
    @param bulk: whether to return all information about all instances
292

293
    @rtype: list of dict or list of str
294
    @return: if bulk is True, info about the instances, else a list of instances
295

296
    """
297
    query = []
298
    if bulk:
299
      query.append(("bulk", 1))
300

    
301
    instances = self._SendRequest(HTTP_GET, "/instances", query)
302
    if bulk:
303
      return instances
304
    else:
305
      return [i["id"] for i in instances]
306

    
307

    
308
  def GetInstanceInfo(self, instance):
309
    """Gets information about an instance.
310

311
    @type instance: str
312
    @param instance: instance whose info to return
313

314
    @rtype: dict
315
    @return: info about the instance
316

317
    """
318
    return self._SendRequest(HTTP_GET, "/instances/%s" % instance)
319

    
320
  def CreateInstance(self, dry_run=False):
321
    """Creates a new instance.
322

323
    @type dry_run: bool
324
    @param dry_run: whether to perform a dry run
325

326
    @rtype: int
327
    @return: job id
328

329
    """
330
    # TODO: Pass arguments needed to actually create an instance.
331
    query = []
332
    if dry_run:
333
      query.append(("dry-run", 1))
334

    
335
    return self._SendRequest(HTTP_POST, "/instances", query)
336

    
337
  def DeleteInstance(self, instance, dry_run=False):
338
    """Deletes an instance.
339

340
    @type instance: str
341
    @param instance: the instance to delete
342

343
    @rtype: int
344
    @return: job id
345

346
    """
347
    query = []
348
    if dry_run:
349
      query.append(("dry-run", 1))
350

    
351
    return self._SendRequest(HTTP_DELETE, "/instances/%s" % instance, query)
352

    
353
  def GetInstanceTags(self, instance):
354
    """Gets tags for an instance.
355

356
    @type instance: str
357
    @param instance: instance whose tags to return
358

359
    @rtype: list of str
360
    @return: tags for the instance
361

362
    """
363
    return self._SendRequest(HTTP_GET, "/instances/%s/tags" % instance)
364

    
365
  def AddInstanceTags(self, instance, tags, dry_run=False):
366
    """Adds tags to an instance.
367

368
    @type instance: str
369
    @param instance: instance to add tags to
370
    @type tags: list of str
371
    @param tags: tags to add to the instance
372
    @type dry_run: bool
373
    @param dry_run: whether to perform a dry run
374

375
    @rtype: int
376
    @return: job id
377

378
    """
379
    query = [("tag", t) for t in tags]
380
    if dry_run:
381
      query.append(("dry-run", 1))
382

    
383
    return self._SendRequest(HTTP_PUT, "/instances/%s/tags" % instance, query)
384

    
385
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
386
    """Deletes tags from an instance.
387

388
    @type instance: str
389
    @param instance: instance to delete tags from
390
    @type tags: list of str
391
    @param tags: tags to delete
392
    @type dry_run: bool
393
    @param dry_run: whether to perform a dry run
394

395
    """
396
    query = [("tag", t) for t in tags]
397
    if dry_run:
398
      query.append(("dry-run", 1))
399

    
400
    self._SendRequest(HTTP_DELETE, "/instances/%s/tags" % instance, query)
401

    
402
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
403
                     dry_run=False):
404
    """Reboots an instance.
405

406
    @type instance: str
407
    @param instance: instance to rebot
408
    @type reboot_type: str
409
    @param reboot_type: one of: hard, soft, full
410
    @type ignore_secondaries: bool
411
    @param ignore_secondaries: if True, ignores errors for the secondary node
412
        while re-assembling disks (in hard-reboot mode only)
413
    @type dry_run: bool
414
    @param dry_run: whether to perform a dry run
415

416
    """
417
    query = []
418
    if reboot_type:
419
      query.append(("type", reboot_type))
420
    if ignore_secondaries is not None:
421
      query.append(("ignore_secondaries", ignore_secondaries))
422
    if dry_run:
423
      query.append(("dry-run", 1))
424

    
425
    self._SendRequest(HTTP_POST, "/instances/%s/reboot" % instance, query)
426

    
427
  def ShutdownInstance(self, instance, dry_run=False):
428
    """Shuts down an instance.
429

430
    @type instance: str
431
    @param instance: the instance to shut down
432
    @type dry_run: bool
433
    @param dry_run: whether to perform a dry run
434

435
    """
436
    query = []
437
    if dry_run:
438
      query.append(("dry-run", 1))
439

    
440
    self._SendRequest(HTTP_PUT, "/instances/%s/shutdown" % instance, query)
441

    
442
  def StartupInstance(self, instance, dry_run=False):
443
    """Starts up an instance.
444

445
    @type instance: str
446
    @param instance: the instance to start up
447
    @type dry_run: bool
448
    @param dry_run: whether to perform a dry run
449

450
    """
451
    query = []
452
    if dry_run:
453
      query.append(("dry-run", 1))
454

    
455
    self._SendRequest(HTTP_PUT, "/instances/%s/startup" % instance, query)
456

    
457
  def ReinstallInstance(self, instance, os, no_startup=False):
458
    """Reinstalls an instance.
459

460
    @type instance: str
461
    @param instance: the instance to reinstall
462
    @type os: str
463
    @param os: the os to reinstall
464
    @type no_startup: bool
465
    @param no_startup: whether to start the instance automatically
466

467
    """
468
    query = [("os", os)]
469
    if no_startup:
470
      query.append(("nostartup", 1))
471
    self._SendRequest(HTTP_POST, "/instances/%s/reinstall" % instance, query)
472

    
473
  def ReplaceInstanceDisks(self, instance, disks, mode="replace_auto",
474
                           remote_node=None, iallocator="hail", dry_run=False):
475
    """Replaces disks on an instance.
476

477
    @type instance: str
478
    @param instance: instance whose disks to replace
479
    @type disks: list of str
480
    @param disks: disks to replace
481
    @type mode: str
482
    @param mode: replacement mode to use. defaults to replace_auto
483
    @type remote_node: str or None
484
    @param remote_node: new secondary node to use (for use with
485
        replace_new_secondary mdoe)
486
    @type iallocator: str or None
487
    @param iallocator: instance allocator plugin to use (for use with
488
        replace_auto mdoe).  default is hail
489
    @type dry_run: bool
490
    @param dry_run: whether to perform a dry run
491

492
    @rtype: int
493
    @return: job id
494

495
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
496
    @raises GanetiApiError: If no secondary node is given with a non-auto
497
        replacement mode is requested.
498

499
    """
500
    if mode not in VALID_REPLACEMENT_MODES:
501
      raise InvalidReplacementMode("%s is not a valid disk replacement mode.",
502
                                   mode)
503

    
504
    query = [("mode", mode), ("disks", ",".join(disks))]
505

    
506
    if mode is REPLACE_DISK_AUTO:
507
      query.append(("iallocator", iallocator))
508
    elif mode is REPLACE_DISK_SECONDARY:
509
      if remote_node is None:
510
        raise GanetiApiError("You must supply a new secondary node.")
511
      query.append(("remote_node", remote_node))
512

    
513
    if dry_run:
514
      query.append(("dry-run", 1))
515

    
516
    return self._SendRequest(HTTP_POST,
517
                             "/instances/%s/replace-disks" % instance, query)
518

    
519
  def GetJobs(self):
520
    """Gets all jobs for the cluster.
521

522
    @rtype: list of int
523
    @return: job ids for the cluster
524

525
    """
526
    return [int(j["id"]) for j in self._SendRequest(HTTP_GET, "/jobs")]
527

    
528
  def GetJobStatus(self, job_id):
529
    """Gets the status of a job.
530

531
    @type job_id: int
532
    @param job_id: job id whose status to query
533

534
    @rtype: dict
535
    @return: job status
536

537
    """
538
    return self._SendRequest(HTTP_GET, "/jobs/%d" % job_id)
539

    
540
  def DeleteJob(self, job_id, dry_run=False):
541
    """Deletes a job.
542

543
    @type job_id: int
544
    @param job_id: id of the job to delete
545
    @type dry_run: bool
546
    @param dry_run: whether to perform a dry run
547

548
    """
549
    query = []
550
    if dry_run:
551
      query.append(("dry-run", 1))
552

    
553
    self._SendRequest(HTTP_DELETE, "/jobs/%d" % job_id, query)
554

    
555
  def GetNodes(self, bulk=False):
556
    """Gets all nodes in the cluster.
557

558
    @type bulk: bool
559
    @param bulk: whether to return all information about all instances
560

561
    @rtype: list of dict or str
562
    @return: if bulk is true, info about nodes in the cluster,
563
        else list of nodes in the cluster
564

565
    """
566
    query = []
567
    if bulk:
568
      query.append(("bulk", 1))
569

    
570
    nodes = self._SendRequest(HTTP_GET, "/nodes", query)
571
    if bulk:
572
      return nodes
573
    else:
574
      return [n["id"] for n in nodes]
575

    
576
  def GetNodeInfo(self, node):
577
    """Gets information about a node.
578

579
    @type node: str
580
    @param node: node whose info to return
581

582
    @rtype: dict
583
    @return: info about the node
584

585
    """
586
    return self._SendRequest(HTTP_GET, "/nodes/%s" % node)
587

    
588
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
589
                   dry_run=False):
590
    """Evacuates instances from a Ganeti node.
591

592
    @type node: str
593
    @param node: node to evacuate
594
    @type iallocator: str or None
595
    @param iallocator: instance allocator to use
596
    @type remote_node: str
597
    @param remote_node: node to evaucate to
598
    @type dry_run: bool
599
    @param dry_run: whether to perform a dry run
600

601
    @rtype: int
602
    @return: job id
603

604
    @raises GanetiApiError: if an iallocator and remote_node are both specified
605

606
    """
607
    query = []
608
    if iallocator and remote_node:
609
      raise GanetiApiError("Only one of iallocator or remote_node can be used.")
610

    
611
    if iallocator:
612
      query.append(("iallocator", iallocator))
613
    if remote_node:
614
      query.append(("remote_node", remote_node))
615
    if dry_run:
616
      query.append(("dry-run", 1))
617

    
618
    return self._SendRequest(HTTP_POST, "/nodes/%s/evacuate" % node, query)
619

    
620
  def MigrateNode(self, node, live=True, dry_run=False):
621
    """Migrates all primary instances from a node.
622

623
    @type node: str
624
    @param node: node to migrate
625
    @type live: bool
626
    @param live: whether to use live migration
627
    @type dry_run: bool
628
    @param dry_run: whether to perform a dry run
629

630
    @rtype: int
631
    @return: job id
632

633
    """
634
    query = []
635
    if live:
636
      query.append(("live", 1))
637
    if dry_run:
638
      query.append(("dry-run", 1))
639

    
640
    return self._SendRequest(HTTP_POST, "/nodes/%s/migrate" % node, query)
641

    
642
  def GetNodeRole(self, node):
643
    """Gets the current role for a node.
644

645
    @type node: str
646
    @param node: node whose role to return
647

648
    @rtype: str
649
    @return: the current role for a node
650

651
    """
652
    return self._SendRequest(HTTP_GET, "/nodes/%s/role" % node)
653

    
654
  def SetNodeRole(self, node, role, force=False):
655
    """Sets the role for a node.
656

657
    @type node: str
658
    @param node: the node whose role to set
659
    @type role: str
660
    @param role: the role to set for the node
661
    @type force: bool
662
    @param force: whether to force the role change
663

664
    @rtype: int
665
    @return: job id
666

667
    @raise InvalidNodeRole: If an invalid node role is specified
668

669
    """
670
    if role not in VALID_NODE_ROLES:
671
      raise InvalidNodeRole("%s is not a valid node role.", role)
672

    
673
    query = [("force", force)]
674
    return self._SendRequest(HTTP_PUT, "/nodes/%s/role" % node, query,
675
                             content=role)
676

    
677
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
678
    """Gets the storage units for a node.
679

680
    @type node: str
681
    @param node: the node whose storage units to return
682
    @type storage_type: str
683
    @param storage_type: storage type whose units to return
684
    @type output_fields: str
685
    @param output_fields: storage type fields to return
686

687
    @rtype: int
688
    @return: job id where results can be retrieved
689

690
    @raise InvalidStorageType: If an invalid storage type is specified
691

692
    """
693
    # TODO: Add default for storage_type & output_fields
694
    if storage_type not in VALID_STORAGE_TYPES:
695
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
696

    
697
    query = [("storage_type", storage_type), ("output_fields", output_fields)]
698
    return self._SendRequest(HTTP_GET, "/nodes/%s/storage" % node, query)
699

    
700
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
701
    """Modifies parameters of storage units on the node.
702

703
    @type node: str
704
    @param node: node whose storage units to modify
705
    @type storage_type: str
706
    @param storage_type: storage type whose units to modify
707
    @type name: str
708
    @param name: name of the storage unit
709
    @type allocatable: bool
710
    @param allocatable: TODO: Document me
711

712
    @rtype: int
713
    @return: job id
714

715
    @raise InvalidStorageType: If an invalid storage type is specified
716

717
    """
718
    if storage_type not in VALID_STORAGE_TYPES:
719
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
720

    
721
    query = [
722
        ("storage_type", storage_type), ("name", name),
723
        ("allocatable", allocatable)
724
        ]
725
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/modify" % node, query)
726

    
727
  def RepairNodeStorageUnits(self, node, storage_type, name):
728
    """Repairs a storage unit on the node.
729

730
    @type node: str
731
    @param node: node whose storage units to repair
732
    @type storage_type: str
733
    @param storage_type: storage type to repair
734
    @type name: str
735
    @param name: name of the storage unit to repair
736

737
    @rtype: int
738
    @return: job id
739

740
    @raise InvalidStorageType: If an invalid storage type is specified
741

742
    """
743
    if storage_type not in VALID_STORAGE_TYPES:
744
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
745

    
746
    query = [("storage_type", storage_type), ("name", name)]
747
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/repair" % node, query)
748

    
749
  def GetNodeTags(self, node):
750
    """Gets the tags for a node.
751

752
    @type node: str
753
    @param node: node whose tags to return
754

755
    @rtype: list of str
756
    @return: tags for the node
757

758
    """
759
    return self._SendRequest(HTTP_GET, "/nodes/%s/tags" % node)
760

    
761
  def AddNodeTags(self, node, tags, dry_run=False):
762
    """Adds tags to a node.
763

764
    @type node: str
765
    @param node: node to add tags to
766
    @type tags: list of str
767
    @param tags: tags to add to the node
768
    @type dry_run: bool
769
    @param dry_run: whether to perform a dry run
770

771
    @rtype: int
772
    @return: job id
773

774
    """
775
    query = [("tag", t) for t in tags]
776
    if dry_run:
777
      query.append(("dry-run", 1))
778

    
779
    return self._SendRequest(HTTP_PUT, "/nodes/%s/tags" % node, query,
780
                             content=tags)
781

    
782
  def DeleteNodeTags(self, node, tags, dry_run=False):
783
    """Delete tags from a node.
784

785
    @type node: str
786
    @param node: node to remove tags from
787
    @type tags: list of str
788
    @param tags: tags to remove from the node
789
    @type dry_run: bool
790
    @param dry_run: whether to perform a dry run
791

792
    @rtype: int
793
    @return: job id
794

795
    """
796
    query = [("tag", t) for t in tags]
797
    if dry_run:
798
      query.append(("dry-run", 1))
799

    
800
    return self._SendRequest(HTTP_DELETE, "/nodes/%s/tags" % node, query)
801

    
802

    
803
class HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
804
  """HTTPS Connection handler that verifies the SSL certificate.
805

806
  """
807

    
808
  # pylint: disable-msg=W0142
809
  def __init__(self, *args, **kwargs):
810
    """Constructor.
811

812
    """
813
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
814

    
815
    self._ssl_cert = None
816
    if self.cert_file:
817
      f = open(self.cert_file, "r")
818
      self._ssl_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
819
      f.close()
820

    
821
  # pylint: disable-msg=W0613
822
  def _VerifySSLCertCallback(self, conn, cert, errnum, errdepth, ok):
823
    """Verifies the SSL certificate provided by the peer.
824

825
    """
826
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
827
            self._ssl_cert.digest("md5") == cert.digest("md5"))
828

    
829
  def connect(self):
830
    """Connect to the server specified when the object was created.
831

832
    This ensures that SSL certificates are verified.
833

834
    """
835
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
836
    ctx = SSL.Context(SSL.SSLv23_METHOD)
837
    ctx.set_options(SSL.OP_NO_SSLv2)
838
    ctx.use_certificate(self._ssl_cert)
839
    ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
840
                   self._VerifySSLCertCallback)
841

    
842
    ssl = SSL.Connection(ctx, sock)
843
    ssl.connect((self.host, self.port))
844
    self.sock = httplib.FakeSocket(sock, ssl)