Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / commissioning / specificator.py @ 54069d1b

History | View | Annotate | Download (22 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2012 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
from random import random, choice, randint
36
from math import log
37
from inspect import isclass
38
from .utils.argmap import (argmap_decode, argmap_check, argmap_unpack_dict,
39
                           argmap_unpack_list)
40

    
41
try:
42
    from collections import OrderedDict
43
except ImportError:
44
    from .utils.ordereddict import OrderedDict
45

    
46
def shorts(s):
47
    if not isinstance(s, unicode):
48
        s = str(s)
49

    
50
    if len(s) <= 64:
51
        return s
52

    
53
    return s[:61] + '...'
54

    
55

    
56
class CanonifyException(Exception):
57
    pass
58

    
59
class SpecifyException(Exception):
60
    pass
61

    
62

    
63
class Canonical(object):
64

    
65
    _random_choice = None
66

    
67
    def __init__(self, *args, **kw):
68
        self.args = []
69
        named_args = []
70
        for a in args:
71
            if isinstance(a, tuple) and len(a) == 2:
72
                named_args.append(a)
73
            else:
74
                self.args.append(a)
75
        ordered_dict = OrderedDict(named_args)
76

    
77
        self.name = kw.pop('classname', self.__class__.__name__)
78
        random_choice = kw.pop('random', None)
79
        if random_choice is not None:
80
            self.random_choice = random_choice
81
        opts = {}
82
        for k, v in kw.items():
83
            if not isinstance(v, Canonical):
84
                if isclass(v) and issubclass(v, Canonical):
85
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
86
                         "Perhaps you meant to specify a Canonical _instance_"
87
                         % (k, v))
88
                    raise SpecifyException(m)
89
                opts[k] = v
90
                del kw[k]
91

    
92
        self.opts = opts
93
        ordered_dict.update(kw)
94
        self.kw = ordered_dict
95
        self.init()
96

    
97
        if 'default' in opts:
98
            item = opts['default']
99
            if item is None:
100
                opts['null'] = 1
101
            else:
102
                opts['default'] = self._check(item)
103

    
104
    def init(self):
105
        return
106

    
107
    def __call__(self, item):
108
        return self.check(item)
109

    
110
    def check(self, item):
111
        if argmap_check(item):
112
            item = self._unpack(item)
113

    
114
        opts = self.opts
115
        if item is None and 'default' in opts:
116
            item = opts['default']
117

    
118
        can_be_null = opts.get('null', False)
119
        if item is None and can_be_null:
120
            return None
121

    
122
        return self._check(item)
123

    
124
    def _check(self, item):
125
        return item
126

    
127
    def _unpack(self, item):
128
        return argmap_unpack_list(item)
129

    
130
    def create(self):
131
        return None
132

    
133
    def random(self, **kw):
134
        random_choice = self._random_choice
135
        if random_choice is None:
136
            return None
137

    
138
        if callable(random_choice):
139
            return random_choice(kw)
140

    
141
        if isinstance(random_choice, str):
142
            return getattr(self, random_choice)(kw)
143

    
144
        return choice(random_choice)
145

    
146
    def tostring(self, depth=0, showopts=0, multiline=0):
147
        depth += 1
148
        if not multiline:
149
            argdepth = ''
150
            owndepth = ''
151
            joinchar = ','
152
            padchar = ''
153
        else:
154
            argdepth = '    ' * depth
155
            owndepth = '    ' * (depth - 1)
156
            joinchar = ',\n'
157
            padchar = '\n'
158

    
159
        args = [a.tostring( depth=depth,
160
                            showopts=showopts,
161
                            multiline=multiline) for a in self.args]
162
        args += [("%s=%s" %
163
                    (k, v.tostring( depth=depth,
164
                                    showopts=showopts,
165
                                    multiline=multiline)))
166

    
167
                                    for k, v in self.kw.items()]
168
        if showopts:
169
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
170

    
171
        if len(args) == 0:
172
            string = "%s(%s)" % (self.name, ','.join(args))
173
        else:
174
            string = "%s(%s" % (self.name, padchar)
175
            for arg in args:
176
                string += argdepth + arg + joinchar
177
            string = string[:-1] + padchar
178
            string += owndepth + ")"
179

    
180
        return string
181

    
182
    __str__ = tostring
183

    
184
    def __repr__(self):
185
        return self.tostring(multiline=0, showopts=1)
186

    
187
    def show(self):
188
        showable = self.opts.get('show', True)
189
        return self._show() if showable else ''
190

    
191
    def _show(self):
192
        return self.name
193

    
194
class Null(Canonical):
195

    
196
    def _check(self, item):
197
        return None
198

    
199
Nothing = Null()
200

    
201

    
202
class Integer(Canonical):
203

    
204
    def _check(self, item):
205
        try:
206
            num = long(item)
207
        except ValueError, e:
208
            try:
209
                num = long(item, 16)
210
            except Exception:
211
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
212
                raise CanonifyException(m)
213
        except TypeError, e:
214
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
215
            raise CanonifyException(m)
216

    
217
        optget = self.opts.get
218
        minimum = optget('minimum', None)
219
        maximum = optget('maximum', None)
220

    
221
        if minimum is not None and num < minimum:
222
            m = "%s: %d < minimum=%d" % (self, num, minimum)
223
            raise CanonifyException(m)
224

    
225
        if maximum is not None and num > maximum:
226
            m = "%s: %d > maximum=%d" % (self, num, maximum)
227
            raise CanonifyException(m)
228

    
229
        return num
230

    
231
    def _random_choice(self, kw):
232
        optget = self.opts.get
233
        kwget = kw.get
234
        minimum = kwget('minimum', optget('minimum', -4294967296L))
235
        maximum = kwget('maximum', optget('maximum', 4294967295L))
236
        r = random()
237
        if r < 0.1:
238
            return minimum
239
        if r < 0.2:
240
            return maximum
241
        if minimum <= 0 and maximum >= 0 and r < 0.3:
242
            return 0L
243
        return long(minimum + r * (maximum - minimum))
244

    
245

    
246

    
247
Serial = Integer(
248
            classname   =   'Serial',
249
            null        =   True,
250
)
251

    
252

    
253
class Text(Canonical):
254

    
255
    re = None
256
    matcher = None
257
    choices = None
258

    
259
    def init(self):
260
        opts = self.opts
261
        if 'regex' in opts:
262
            pat = opts['regex']
263
            re = self.re
264
            if re is None:
265
                import re
266
                self.re = re
267

    
268
            self.matcher = re.compile(pat, re.UNICODE)
269
            self.pat = pat
270

    
271
        if 'choices' in opts:
272
            opts['choices'] = dict((unicode(x), unicode(x))
273
                                    for x in opts['choices'])
274

    
275
    def _check(self, item):
276
        if not isinstance(item, unicode):
277
            # require non-unicode items to be utf8
278
            item = str(item)
279
            try:
280
                item = item.decode('utf8')
281
            except UnicodeDecodeError, e:
282
                item = item.decode('latin1')
283
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
284
                raise CanonifyException(m)
285

    
286
        opts = self.opts
287
        if 'choices' in opts:
288
            choices = opts['choices']
289
            try:
290
                unknown = item not in choices
291
            except TypeError, e:
292
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
293
                raise CanonifyException(m, e)
294

    
295
            if unknown:
296
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
297
                raise CanonifyException(m)
298

    
299
            return choices[item]
300

    
301
        optget = opts.get
302
        itemlen = len(item)
303
        maxlen = optget('maxlen', None)
304
        if maxlen is not None and itemlen > maxlen:
305
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
306
            raise CanonifyException(m)
307

    
308
        minlen = optget('minlen', None)
309
        if minlen is not None and itemlen < minlen:
310
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
311
            raise CanonifyException(m)
312

    
313
        matcher = self.matcher
314
        if matcher is not None:
315
            match = matcher.match(item)
316
            if  (       match is None
317
                    or  (match.start(), match.end()) != (0, itemlen)    ):
318

    
319
                    m = ("%s: '%s' does not match '%s'"
320
                            % (self, shorts(item), self.pat))
321
                    raise CanonifyException(m)
322

    
323
        return item
324

    
325
    default_alphabet = '0123456789ฮฑฮฒฮณฮดฮตฮถ'.decode('utf8')
326

    
327
    def _random_choice(self, kw):
328
        opts = self.opts
329
        if 'regex' in opts:
330
            m = 'Unfortunately, random for regex strings not supported'
331
            raise ValueError(m)
332

    
333
        optget = opts.get
334
        kwget = kw.get
335
        minlen = kwget('minlen', optget('minlen', 0))
336
        maxlen = kwget('maxlen', optget('maxlen', 32))
337
        alphabet = kwget('alphabet', self.default_alphabet)
338
        z = maxlen - minlen
339
        if z < 1:
340
            z = 1
341

    
342
        g = log(z, 2)
343
        r = random() * g
344
        z = minlen + int(2**r)
345

    
346
        s = u''
347
        for _ in xrange(z):
348
            s += choice(alphabet)
349

    
350
        return s
351

    
352

    
353
class Bytes(Canonical):
354

    
355
    re = None
356
    matcher = None
357
    choices = None
358

    
359
    def init(self):
360
        opts = self.opts
361
        if 'regex' in opts:
362
            pat = opts['regex']
363
            re = self.re
364
            if re is None:
365
                import re
366
                self.re = re
367

    
368
            self.matcher = re.compile(pat)
369
            self.pat = pat
370

    
371
        if 'choices' in opts:
372
            opts['choices'] = dict((str(x), str(x))
373
                                    for x in opts['choices'])
374

    
375
    def _check(self, item):
376
        if isinstance(item, unicode):
377
            # convert unicode to utf8
378
            item = item.encode('utf8')
379

    
380
        opts = self.opts
381
        if 'choices' in opts:
382
            choices = opts['choices']
383
            try:
384
                unknown = item not in choices
385
            except TypeError, e:
386
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
387
                raise CanonifyException(m, e)
388

    
389
            if unknown:
390
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
391
                raise CanonifyException(m)
392

    
393
            return choices[item]
394

    
395
        optget = opts.get
396
        itemlen = len(item)
397
        maxlen = optget('maxlen', None)
398
        if maxlen is not None and itemlen > maxlen:
399
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
400
            raise CanonifyException(m)
401

    
402
        minlen = optget('minlen', None)
403
        if minlen is not None and itemlen < minlen:
404
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
405
            raise CanonifyException(m)
406

    
407
        matcher = self.matcher
408
        if matcher is not None:
409
            match = matcher.match(item)
410
            if  (       match is None
411
                    or  (match.start(), match.end()) != (0, itemlen)    ):
412

    
413
                    m = ("%s: '%s' does not match '%s'"
414
                            % (self, shorts(item), self.pat))
415
                    raise CanonifyException(m)
416

    
417
        return item
418

    
419
    default_alphabet = '0123456789abcdef'
420

    
421
    def _random_choice(self, kw):
422
        opts = self.opts
423
        if 'regex' in opts:
424
            m = 'Unfortunately, random for regex strings not supported'
425
            raise ValueError(m)
426

    
427
        optget = opts.get
428
        kwget = kw.get
429
        minlen = kwget('minlen', optget('minlen', 0))
430
        maxlen = kwget('maxlen', optget('maxlen', 32))
431
        alphabet = kwget('alphabet', self.default_alphabet)
432
        z = maxlen - minlen
433
        if z < 1:
434
            z = 1
435

    
436
        g = log(z, 2)
437
        r = random() * g
438
        z = minlen + int(2**r)
439

    
440
        s = u''
441
        for _ in xrange(z):
442
            s += choice(alphabet)
443

    
444
        return s
445

    
446

    
447
class ListOf(Canonical):
448

    
449
    def init(self):
450
        args = self.args
451
        kw = self.kw
452

    
453
        if not (args or kw):
454
            raise SpecifyException("ListOf requires one or more arguments")
455

    
456
        if args and kw:
457
            m = ("ListOf requires either positional "
458
                 "or keyword arguments, but not both")
459
            raise SpecifyException(m)
460

    
461
        if args:
462
            if len(args) > 1:
463
                self.canonical = Tuple(*args)
464
            else:
465
                self.canonical = args[0]
466
        else:
467
            self.canonical = Args(**kw)
468

    
469
    def _check(self, item):
470
        if item is None:
471
            item = ()
472

    
473
        try:
474
            items = iter(item)
475
        except TypeError, e:
476
            m = "%s: %s is not iterable" % (self, shorts(item))
477
            raise CanonifyException(m)
478

    
479
        canonical = self.canonical
480
        canonified = []
481
        append = canonified.append
482

    
483
        for item in items:
484
            item = canonical(item)
485
            append(item)
486

    
487
        if not canonified and self.opts.get('nonempty', False):
488
            m = "%s: must be nonempty" % (self,)
489
            raise CanonifyException(m)
490

    
491
        return canonified
492

    
493
    def _random_choice(self, kw):
494
        z = randint(1, 4)
495
        get_random = self.canonical.random
496

    
497
        return [get_random() for _ in xrange(z)]
498

    
499
    def _show(self):
500
        return '[ ' + self.canonical.show() + ' ... ]'
501

    
502
class Args(Canonical):
503

    
504
    def _unpack(self, item):
505
        arglist = argmap_unpack_dict(item)
506
        keys = self.kw.keys()
507
        arglen = len(arglist)
508
        if arglen != len(keys):
509
            m = "inconsistent number of parameters: %s != %s" % (
510
            arglen, len(keys))
511
            raise CanonifyException(m)
512

    
513
        position = 0
514
        named_args = OrderedDict()
515

    
516
        for k, v in arglist:
517
            if k is not None:
518
                named_args[k] = v
519
            else:
520
                # find the right position
521
                for i in range(position, arglen):
522
                    key = keys[i]
523
                    if not key in named_args.keys():
524
                       position = i + 1
525
                       break
526
                else:
527
                    m = "Formal arguments exhausted"
528
                    raise AssertionError(m)
529
                named_args[key] = v
530

    
531
        return named_args
532

    
533
    def _check(self, item):
534
        try:
535
            arglist = OrderedDict(item).items()
536
        except (TypeError, ValueError), e:
537
            m = "%s: %s is not dict-able" % (self, shorts(item))
538
            raise CanonifyException(m)
539

    
540
        canonified = OrderedDict()
541

    
542
        try:
543
            for n, c in self.kw.items():
544
                t = item[n] if n in item else None
545
                canonified[n] = c.check(t)
546
        except KeyError:
547
            m = ("%s: Argument '%s' not found in '%s'"
548
                 % (self, shorts(n), shorts(item)))
549
            raise CanonifyException(m)
550

    
551
        return canonified
552

    
553
    def _show(self):
554
        strings = [x for x in [c.show() for n, c in self.kw.items()] if x]
555
        return ' '.join(strings)
556

    
557
    def _random_choice(self, kw):
558
        args = {}
559
        for n, c in self.kw.items():
560
            args[n] = c.random()
561
        return args
562

    
563

    
564
class Tuple(Canonical):
565

    
566
    def _check(self, item):
567
        try:
568
            items = list(item)
569
        except TypeError, e:
570
            m = "%s: %s is not iterable" % (self, shorts(item))
571
            raise CanonifyException(m)
572

    
573
        canonicals = self.args
574
        zi = len(items)
575
        zc = len(canonicals)
576

    
577
        if zi != zc:
578
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
579
            raise CanonifyException(m)
580

    
581
        g = (canonical(element) for canonical, element in zip(self.args, item))
582

    
583
        return tuple(g)
584

    
585
    def __add__(self, other):
586
        oargs = other.args if isinstance(other, Tuple) else (other,)
587
        args = self.args + oargs
588
        return self.__class__(*args)
589

    
590
    def _random_choice(self, kw):
591
        return tuple(c.random() for c in self.args)
592

    
593
    def _show(self):
594
        canonicals = self.args
595
        strings = [x for x in [c.show() for c in canonicals] if x]
596
        return '[ ' + ' '.join(strings) + ' ]'
597

    
598
class Dict(Canonical):
599

    
600
    def _check(self, item):
601

    
602
        try:
603
            item = dict(item)
604
        except TypeError:
605
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
606
            raise CanonifyException(m)
607

    
608
        canonified = {}
609
        canonical = self.kw
610

    
611
        for n, c in canonical.items():
612
            if n not in item:
613
                m = "%s: key '%s' not found" % (self, shorts(n))
614
                raise CanonifyException(m)
615
            canonified[n] = c(item[n])
616

    
617
        strict = self.opts.get('strict', True)
618
        if strict and len(item) != len(canonical):
619
            for k in sorted(item.keys()):
620
                if k not in canonical:
621
                    break
622

    
623
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
624
            raise CanonifyException(m)
625

    
626
        return canonified
627

    
628
    def _random_choice(self, kw):
629
        item = {}
630
        for n, c in self.kw.items():
631
            item[n] = c.random()
632

    
633
        return item
634

    
635

    
636
class Canonifier(object):
637
    def __init__(self, name, input_canonicals, output_canonicals, doc_strings):
638
        self.name = name
639
        self.input_canonicals = dict(input_canonicals)
640
        self.output_canonicals = dict(output_canonicals)
641
        self.doc_strings = dict(doc_strings)
642

    
643
    def call_names(self):
644
        return self.input_canonicals.keys()
645

    
646
    def call_docs(self):
647
        get_input_canonical = self.input_canonical
648
        for call_name, call_doc in self.doc_strings.iteritems():
649
            if not call_doc:
650
                canonical = get_input_canonical(call_name)
651
                call_doc = canonical.tostring(showopts=1, multiline=1)
652
            yield call_name, call_doc
653

    
654
    def get_doc(self, name):
655
        doc_strings = self.doc_strings
656
        if name not in doc_strings:
657
            m = "%s: Invalid method name '%s'" % (self.name, name)
658
            raise CanonifyException(m)
659

    
660
        docstring = doc_strings[name]
661
        if not docstring:
662
            docstring = self.input_canonical(name).tostring()
663
        return docstring
664

    
665
    def call_attrs(self):
666
        for call_name, canonical in self.input_canonicals.iteritems():
667
            yield call_name, canonical.tostring(showopts=1, multiline=1)
668

    
669
    def input_canonical(self, name):
670
        input_canonicals = self.input_canonicals
671
        if name not in input_canonicals:
672
            m = "%s: Invalid input call '%s'" % (self.name, name)
673
            raise CanonifyException(m)
674

    
675
        return input_canonicals[name]
676

    
677
    def canonify_input(self, name, the_input):
678
        return self.input_canonical(name)(the_input)
679

    
680
    def output_canonical(self, name):
681
        output_canonicals = self.output_canonicals
682
        if name not in output_canonicals:
683
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
684
            raise CanonifyException(m)
685

    
686
        return output_canonicals[name]
687

    
688
    def canonify_output(self, name, the_output):
689
        return self.output_canonical(name)(the_output)
690

    
691
    def show_input_canonical(self, name):
692
        return self.input_canonical(name).show()
693

    
694
    def parse(self, method, arglist):
695
        args, rest = argmap_decode(arglist)
696
        return self.input_canonical(method).check(args)
697

    
698

    
699
class Specificator(object):
700

    
701
    def __new__(cls):
702
        if cls is Specificator:
703
            m = "Specificator classes must be subclassed"
704
            raise SpecifyException(m)
705

    
706
        import inspect
707

    
708
        canonical_inputs = {}
709
        canonical_outputs = {}
710
        doc_strings = {}
711

    
712
        for name in dir(cls):
713
            f = getattr(cls, name)
714
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
715
                continue
716

    
717
            doc_strings[name] = f.__doc__
718
            argspec = inspect.getargspec(f)
719
            defaults = argspec.defaults
720
            args = argspec.args
721
            if args and args[0] == 'self':
722
                args = args[1:]
723

    
724
            if not defaults:
725
                defaults = ()
726

    
727
            arglen = len(args)
728
            deflen = len(defaults)
729

    
730
            if arglen != deflen:
731
                a = (f.__name__, args[:arglen-deflen])
732
                m = "Unspecified arguments in '%s': %s" % a
733
                raise SpecifyException(m)
734

    
735
            args = zip(args, defaults)
736
            for a, c in args:
737
                if not isinstance(c, Canonical):
738
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
739
                         % (a, repr(c)))
740
                    raise SpecifyException(m)
741

    
742
            canonical = Null() if len(args) == 0 else Args(*args)
743
            canonical_inputs[name] = canonical
744

    
745
            self = object.__new__(cls)
746
            canonical = f(self)
747
            if not isinstance(canonical, Canonical):
748
                m = ("method '%s' does not return a Canonical, but a(n) %s "
749
                                                    % (name, type(canonical)))
750
                raise SpecifyException(m)
751
            canonical_outputs[name] = canonical
752

    
753
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
754
                          doc_strings)
755

    
756
    def __call__(self):
757
        return self
758