class Client(object):
- MAX_THREADS = 7
def __init__(self, base_url, token, http_client=KamakiHTTPConnection()):
self.base_url = base_url
'%A, %d-%b-%y %H:%M:%S GMT',
'%a, %d %b %Y %H:%M:%S GMT']
self.http_client = http_client
+ self.MAX_THREADS = 7
def _init_thread_limit(self, limit=1):
+ assert isinstance(limit, int) and limit > 0, 'Thread limit not a +int'
self._thread_limit = limit
self._elapsed_old = 0.0
self._elapsed_new = 0.0
def _watch_thread_limit(self, threadlist):
+ self._thread_limit = getattr(self, '_thread_limit', 1)
+ self._elapsed_new = getattr(self, '_elapsed_new', 0.0)
+ self._elapsed_old = getattr(self, '_elapsed_old', 0.0)
recvlog.debug('# running threads: %s' % len(threadlist))
- if (self._elapsed_old > self._elapsed_new) and (
+ if self._elapsed_old and self._elapsed_old >= self._elapsed_new and (
self._thread_limit < self.MAX_THREADS):
self._thread_limit += 1
- elif self._elapsed_old < self._elapsed_new and self._thread_limit > 1:
+ elif self._elapsed_old <= self._elapsed_new and self._thread_limit > 1:
self._thread_limit -= 1
self._elapsed_old = self._elapsed_new
from time import sleep
from inspect import getmembers, isclass
from json import loads
+from random import randint
from kamaki.clients.connection.test import (
KamakiConnection,
self.assertEqual(self.client.DATE_FORMATS, DATE_FORMATS)
self.assertTrue(isinstance(self.client.http_client, FakeConnection))
+ def test__init_thread_limit(self):
+ exp = 'Nothing set here'
+ for faulty in (-1, 0.5, 'a string', {}):
+ self.assertRaises(
+ AssertionError,
+ self.client._init_thread_limit,
+ faulty)
+ self.assertEqual(exp, getattr(self.client, '_thread_limit', exp))
+ self.assertEqual(exp, getattr(self.client, '_elapsed_old', exp))
+ self.assertEqual(exp, getattr(self.client, '_elapsed_new', exp))
+ self.client._init_thread_limit(42)
+ self.assertEqual(42, self.client._thread_limit)
+ self.assertEqual(0.0, self.client._elapsed_old)
+ self.assertEqual(0.0, self.client._elapsed_new)
+
+ def test__watch_thread_limit(self):
+ waits = (
+ dict(args=((0.1, 1), (0.1, 2), (0.2, 1), (0.7, 1), (0.3, 2))),
+ dict(args=((1.0 - (i / 10.0), (i + 1)) for i in range(7))),
+ dict(max=1, args=tuple([(randint(1, 10) / 3.0, 1), ] * 10)),
+ dict(
+ limit=5,
+ args=tuple([
+ (1.0 + (i / 10.0), (5 - i - 1)) for i in range(4)] + [
+ (2.0, 1), (1.9, 2), (2.0, 1), (2.0, 2)])),
+ dict(args=tuple(
+ [(1.0 - (i / 10.0), (i + 1)) for i in range(7)] + [
+ (0.1, 7), (0.2, 6), (0.4, 5), (0.3, 6), (0.2, 7), (0.1, 7)])),)
+ for wait_dict in waits:
+ if 'max' in wait_dict:
+ self.client.MAX_THREADS = wait_dict['max']
+ else:
+ self.client.MAX_THREADS = 7
+ if 'limit' in wait_dict:
+ self.client._init_thread_limit(wait_dict['limit'])
+ else:
+ self.client._init_thread_limit()
+ self.client._watch_thread_limit(list())
+ self.assertEqual(1, self.client._thread_limit)
+ for wait, exp_limit in wait_dict['args']:
+ self.client._elapsed_new = wait
+ self.client._watch_thread_limit(list())
+ self.assertEqual(exp_limit, self.client._thread_limit)
+
# TestCase auxiliary methods