Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 346c3037

History | View | Annotate | Download (16.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import compat
39
from ganeti import serializer
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import utils
43
from ganeti import objects
44
from ganeti import pathutils
45

    
46

    
47
KEY_METHOD = "method"
48
KEY_ARGS = "args"
49
KEY_SUCCESS = "success"
50
KEY_RESULT = "result"
51
KEY_VERSION = "version"
52

    
53
REQ_SUBMIT_JOB = "SubmitJob"
54
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = "SubmitJobToDrainedQueue"
55
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
56
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
57
REQ_CANCEL_JOB = "CancelJob"
58
REQ_ARCHIVE_JOB = "ArchiveJob"
59
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
60
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
61
REQ_QUERY = "Query"
62
REQ_QUERY_FIELDS = "QueryFields"
63
REQ_QUERY_JOBS = "QueryJobs"
64
REQ_QUERY_INSTANCES = "QueryInstances"
65
REQ_QUERY_NODES = "QueryNodes"
66
REQ_QUERY_GROUPS = "QueryGroups"
67
REQ_QUERY_NETWORKS = "QueryNetworks"
68
REQ_QUERY_EXPORTS = "QueryExports"
69
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
70
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
71
REQ_QUERY_TAGS = "QueryTags"
72
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
73
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
74

    
75
#: List of all LUXI requests
76
REQ_ALL = compat.UniqueFrozenset([
77
  REQ_ARCHIVE_JOB,
78
  REQ_AUTO_ARCHIVE_JOBS,
79
  REQ_CANCEL_JOB,
80
  REQ_CHANGE_JOB_PRIORITY,
81
  REQ_QUERY,
82
  REQ_QUERY_CLUSTER_INFO,
83
  REQ_QUERY_CONFIG_VALUES,
84
  REQ_QUERY_EXPORTS,
85
  REQ_QUERY_FIELDS,
86
  REQ_QUERY_GROUPS,
87
  REQ_QUERY_INSTANCES,
88
  REQ_QUERY_JOBS,
89
  REQ_QUERY_NODES,
90
  REQ_QUERY_NETWORKS,
91
  REQ_QUERY_TAGS,
92
  REQ_SET_DRAIN_FLAG,
93
  REQ_SET_WATCHER_PAUSE,
94
  REQ_SUBMIT_JOB,
95
  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE,
96
  REQ_SUBMIT_MANY_JOBS,
97
  REQ_WAIT_FOR_JOB_CHANGE,
98
  ])
99

    
100
DEF_CTMO = 10
101
DEF_RWTO = 60
102

    
103
# WaitForJobChange timeout
104
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
105

    
106

    
107
class ProtocolError(errors.LuxiError):
108
  """Denotes an error in the LUXI protocol."""
109

    
110

    
111
class ConnectionClosedError(ProtocolError):
112
  """Connection closed error."""
113

    
114

    
115
class TimeoutError(ProtocolError):
116
  """Operation timeout error."""
117

    
118

    
119
class RequestError(ProtocolError):
120
  """Error on request.
121

122
  This signifies an error in the request format or request handling,
123
  but not (e.g.) an error in starting up an instance.
124

125
  Some common conditions that can trigger this exception:
126
    - job submission failed because the job data was wrong
127
    - query failed because required fields were missing
128

129
  """
130

    
131

    
132
class NoMasterError(ProtocolError):
133
  """The master cannot be reached.
134

135
  This means that the master daemon is not running or the socket has
136
  been removed.
137

138
  """
139

    
140

    
141
class PermissionError(ProtocolError):
142
  """Permission denied while connecting to the master socket.
143

144
  This means the user doesn't have the proper rights.
145

146
  """
147

    
148

    
149
class Transport:
150
  """Low-level transport class.
151

152
  This is used on the client side.
153

154
  This could be replace by any other class that provides the same
155
  semantics to the Client. This means:
156
    - can send messages and receive messages
157
    - safe for multithreading
158

159
  """
160

    
161
  def __init__(self, address, timeouts=None):
162
    """Constructor for the Client class.
163

164
    Arguments:
165
      - address: a valid address the the used transport class
166
      - timeout: a list of timeouts, to be used on connect and read/write
167

168
    There are two timeouts used since we might want to wait for a long
169
    time for a response, but the connect timeout should be lower.
170

171
    If not passed, we use a default of 10 and respectively 60 seconds.
172

173
    Note that on reading data, since the timeout applies to an
174
    invidual receive, it might be that the total duration is longer
175
    than timeout value passed (we make a hard limit at twice the read
176
    timeout).
177

178
    """
179
    self.address = address
180
    if timeouts is None:
181
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
182
    else:
183
      self._ctimeout, self._rwtimeout = timeouts
184

    
185
    self.socket = None
186
    self._buffer = ""
187
    self._msgs = collections.deque()
188

    
189
    try:
190
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
191

    
192
      # Try to connect
193
      try:
194
        utils.Retry(self._Connect, 1.0, self._ctimeout,
195
                    args=(self.socket, address, self._ctimeout))
196
      except utils.RetryTimeout:
197
        raise TimeoutError("Connect timed out")
198

    
199
      self.socket.settimeout(self._rwtimeout)
200
    except (socket.error, NoMasterError):
201
      if self.socket is not None:
202
        self.socket.close()
203
      self.socket = None
204
      raise
205

    
206
  @staticmethod
207
  def _Connect(sock, address, timeout):
208
    sock.settimeout(timeout)
209
    try:
210
      sock.connect(address)
211
    except socket.timeout, err:
212
      raise TimeoutError("Connect timed out: %s" % str(err))
213
    except socket.error, err:
214
      error_code = err.args[0]
215
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
216
        raise NoMasterError(address)
217
      elif error_code in (errno.EPERM, errno.EACCES):
218
        raise PermissionError(address)
219
      elif error_code == errno.EAGAIN:
220
        # Server's socket backlog is full at the moment
221
        raise utils.RetryAgain()
222
      raise
223

    
224
  def _CheckSocket(self):
225
    """Make sure we are connected.
226

227
    """
228
    if self.socket is None:
229
      raise ProtocolError("Connection is closed")
230

    
231
  def Send(self, msg):
232
    """Send a message.
233

234
    This just sends a message and doesn't wait for the response.
235

236
    """
237
    if constants.LUXI_EOM in msg:
238
      raise ProtocolError("Message terminator found in payload")
239

    
240
    self._CheckSocket()
241
    try:
242
      # TODO: sendall is not guaranteed to send everything
243
      self.socket.sendall(msg + constants.LUXI_EOM)
244
    except socket.timeout, err:
245
      raise TimeoutError("Sending timeout: %s" % str(err))
246

    
247
  def Recv(self):
248
    """Try to receive a message from the socket.
249

250
    In case we already have messages queued, we just return from the
251
    queue. Otherwise, we try to read data with a _rwtimeout network
252
    timeout, and making sure we don't go over 2x_rwtimeout as a global
253
    limit.
254

255
    """
256
    self._CheckSocket()
257
    etime = time.time() + self._rwtimeout
258
    while not self._msgs:
259
      if time.time() > etime:
260
        raise TimeoutError("Extended receive timeout")
261
      while True:
262
        try:
263
          data = self.socket.recv(4096)
264
        except socket.timeout, err:
265
          raise TimeoutError("Receive timeout: %s" % str(err))
266
        except socket.error, err:
267
          if err.args and err.args[0] == errno.EAGAIN:
268
            continue
269
          raise
270
        break
271
      if not data:
272
        raise ConnectionClosedError("Connection closed while reading")
273
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
274
      self._buffer = new_msgs.pop()
275
      self._msgs.extend(new_msgs)
276
    return self._msgs.popleft()
277

    
278
  def Call(self, msg):
279
    """Send a message and wait for the response.
280

281
    This is just a wrapper over Send and Recv.
282

283
    """
284
    self.Send(msg)
285
    return self.Recv()
286

    
287
  def Close(self):
288
    """Close the socket"""
289
    if self.socket is not None:
290
      self.socket.close()
291
      self.socket = None
292

    
293

    
294
def ParseRequest(msg):
295
  """Parses a LUXI request message.
296

297
  """
298
  try:
299
    request = serializer.LoadJson(msg)
300
  except ValueError, err:
301
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
302

    
303
  logging.debug("LUXI request: %s", request)
304

    
305
  if not isinstance(request, dict):
306
    logging.error("LUXI request not a dict: %r", msg)
307
    raise ProtocolError("Invalid LUXI request (not a dict)")
308

    
309
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
310
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
311
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
312

    
313
  if method is None or args is None:
314
    logging.error("LUXI request missing method or arguments: %r", msg)
315
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
316
                         " in request): %r") % msg)
317

    
318
  return (method, args, version)
319

    
320

    
321
def ParseResponse(msg):
322
  """Parses a LUXI response message.
323

324
  """
325
  # Parse the result
326
  try:
327
    data = serializer.LoadJson(msg)
328
  except KeyboardInterrupt:
329
    raise
330
  except Exception, err:
331
    raise ProtocolError("Error while deserializing response: %s" % str(err))
332

    
333
  # Validate response
334
  if not (isinstance(data, dict) and
335
          KEY_SUCCESS in data and
336
          KEY_RESULT in data):
337
    raise ProtocolError("Invalid response from server: %r" % data)
338

    
339
  return (data[KEY_SUCCESS], data[KEY_RESULT],
340
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
341

    
342

    
343
def FormatResponse(success, result, version=None):
344
  """Formats a LUXI response message.
345

346
  """
347
  response = {
348
    KEY_SUCCESS: success,
349
    KEY_RESULT: result,
350
    }
351

    
352
  if version is not None:
353
    response[KEY_VERSION] = version
354

    
355
  logging.debug("LUXI response: %s", response)
356

    
357
  return serializer.DumpJson(response)
358

    
359

    
360
def FormatRequest(method, args, version=None):
361
  """Formats a LUXI request message.
362

363
  """
364
  # Build request
365
  request = {
366
    KEY_METHOD: method,
367
    KEY_ARGS: args,
368
    }
369

    
370
  if version is not None:
371
    request[KEY_VERSION] = version
372

    
373
  # Serialize the request
374
  return serializer.DumpJson(request)
375

    
376

    
377
def CallLuxiMethod(transport_cb, method, args, version=None):
378
  """Send a LUXI request via a transport and return the response.
379

380
  """
381
  assert callable(transport_cb)
382

    
383
  request_msg = FormatRequest(method, args, version=version)
384

    
385
  # Send request and wait for response
386
  response_msg = transport_cb(request_msg)
387

    
388
  (success, result, resp_version) = ParseResponse(response_msg)
389

    
390
  # Verify version if there was one in the response
391
  if resp_version is not None and resp_version != version:
392
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
393
                           (version, resp_version))
394

    
395
  if success:
396
    return result
397

    
398
  errors.MaybeRaise(result)
399
  raise RequestError(result)
400

    
401

    
402
class Client(object):
403
  """High-level client implementation.
404

405
  This uses a backing Transport-like class on top of which it
406
  implements data serialization/deserialization.
407

408
  """
409
  def __init__(self, address=None, timeouts=None, transport=Transport):
410
    """Constructor for the Client class.
411

412
    Arguments:
413
      - address: a valid address the the used transport class
414
      - timeout: a list of timeouts, to be used on connect and read/write
415
      - transport: a Transport-like class
416

417

418
    If timeout is not passed, the default timeouts of the transport
419
    class are used.
420

421
    """
422
    if address is None:
423
      address = pathutils.MASTER_SOCKET
424
    self.address = address
425
    self.timeouts = timeouts
426
    self.transport_class = transport
427
    self.transport = None
428
    self._InitTransport()
429

    
430
  def _InitTransport(self):
431
    """(Re)initialize the transport if needed.
432

433
    """
434
    if self.transport is None:
435
      self.transport = self.transport_class(self.address,
436
                                            timeouts=self.timeouts)
437

    
438
  def _CloseTransport(self):
439
    """Close the transport, ignoring errors.
440

441
    """
442
    if self.transport is None:
443
      return
444
    try:
445
      old_transp = self.transport
446
      self.transport = None
447
      old_transp.Close()
448
    except Exception: # pylint: disable=W0703
449
      pass
450

    
451
  def _SendMethodCall(self, data):
452
    # Send request and wait for response
453
    try:
454
      self._InitTransport()
455
      return self.transport.Call(data)
456
    except Exception:
457
      self._CloseTransport()
458
      raise
459

    
460
  def Close(self):
461
    """Close the underlying connection.
462

463
    """
464
    self._CloseTransport()
465

    
466
  def CallMethod(self, method, args):
467
    """Send a generic request and return the response.
468

469
    """
470
    if not isinstance(args, (list, tuple)):
471
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
472
                                   " expected list, got %s" % type(args))
473
    return CallLuxiMethod(self._SendMethodCall, method, args,
474
                          version=constants.LUXI_VERSION)
475

    
476
  def SetQueueDrainFlag(self, drain_flag):
477
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
478

    
479
  def SetWatcherPause(self, until):
480
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
481

    
482
  def SubmitJob(self, ops):
483
    ops_state = map(lambda op: op.__getstate__(), ops)
484
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
485

    
486
  def SubmitJobToDrainedQueue(self, ops):
487
    ops_state = map(lambda op: op.__getstate__(), ops)
488
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
489

    
490
  def SubmitManyJobs(self, jobs):
491
    jobs_state = []
492
    for ops in jobs:
493
      jobs_state.append([op.__getstate__() for op in ops])
494
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
495

    
496
  def CancelJob(self, job_id):
497
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
498

    
499
  def ArchiveJob(self, job_id):
500
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
501

    
502
  def ChangeJobPriority(self, job_id, priority):
503
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
504

    
505
  def AutoArchiveJobs(self, age):
506
    timeout = (DEF_RWTO - 1) / 2
507
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
508

    
509
  def WaitForJobChangeOnce(self, job_id, fields,
510
                           prev_job_info, prev_log_serial,
511
                           timeout=WFJC_TIMEOUT):
512
    """Waits for changes on a job.
513

514
    @param job_id: Job ID
515
    @type fields: list
516
    @param fields: List of field names to be observed
517
    @type prev_job_info: None or list
518
    @param prev_job_info: Previously received job information
519
    @type prev_log_serial: None or int/long
520
    @param prev_log_serial: Highest log serial number previously received
521
    @type timeout: int/float
522
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
523
                    be capped to that value)
524

525
    """
526
    assert timeout >= 0, "Timeout can not be negative"
527
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
528
                           (job_id, fields, prev_job_info,
529
                            prev_log_serial,
530
                            min(WFJC_TIMEOUT, timeout)))
531

    
532
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
533
    while True:
534
      result = self.WaitForJobChangeOnce(job_id, fields,
535
                                         prev_job_info, prev_log_serial)
536
      if result != constants.JOB_NOTCHANGED:
537
        break
538
    return result
539

    
540
  def Query(self, what, fields, qfilter):
541
    """Query for resources/items.
542

543
    @param what: One of L{constants.QR_VIA_LUXI}
544
    @type fields: List of strings
545
    @param fields: List of requested fields
546
    @type qfilter: None or list
547
    @param qfilter: Query filter
548
    @rtype: L{objects.QueryResponse}
549

550
    """
551
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
552
    return objects.QueryResponse.FromDict(result)
553

    
554
  def QueryFields(self, what, fields):
555
    """Query for available fields.
556

557
    @param what: One of L{constants.QR_VIA_LUXI}
558
    @type fields: None or list of strings
559
    @param fields: List of requested fields
560
    @rtype: L{objects.QueryFieldsResponse}
561

562
    """
563
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
564
    return objects.QueryFieldsResponse.FromDict(result)
565

    
566
  def QueryJobs(self, job_ids, fields):
567
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
568

    
569
  def QueryInstances(self, names, fields, use_locking):
570
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
571

    
572
  def QueryNodes(self, names, fields, use_locking):
573
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
574

    
575
  def QueryGroups(self, names, fields, use_locking):
576
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
577

    
578
  def QueryNetworks(self, names, fields, use_locking):
579
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
580

    
581
  def QueryExports(self, nodes, use_locking):
582
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
583

    
584
  def QueryClusterInfo(self):
585
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
586

    
587
  def QueryConfigValues(self, fields):
588
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
589

    
590
  def QueryTags(self, kind, name):
591
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))