Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / commissioning / specificator.py @ 6764f588

History | View | Annotate | Download (22 kB)

1
# -*- coding: utf-8 -*-
2
# Copyright 2012 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or
5
# without modification, are permitted provided that the following
6
# conditions are met:
7
#
8
#   1. Redistributions of source code must retain the above
9
#      copyright notice, this list of conditions and the following
10
#      disclaimer.
11
#
12
#   2. Redistributions in binary form must reproduce the above
13
#      copyright notice, this list of conditions and the following
14
#      disclaimer in the documentation and/or other materials
15
#      provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
# POSSIBILITY OF SUCH DAMAGE.
29
#
30
# The views and conclusions contained in the software and
31
# documentation are those of the authors and should not be
32
# interpreted as representing official policies, either expressed
33
# or implied, of GRNET S.A.
34

    
35
from random import random, choice, randint
36
from math import log
37
from inspect import isclass
38
from kamaki.clients.commissioning.utils.argmap import (
39
    argmap_decode,
40
    argmap_check,
41
    argmap_unpack_dict,
42
    argmap_unpack_list)
43

    
44
try:
45
    from collections import OrderedDict
46
except ImportError:
47
    from kamaki.clients.commissioning.utils.ordereddict import OrderedDict
48

    
49

    
50
def shorts(s):
51
    if not isinstance(s, unicode):
52
        s = str(s)
53

    
54
    if len(s) <= 64:
55
        return s
56

    
57
    return s[:61] + '...'
58

    
59

    
60
class CanonifyException(Exception):
61
    pass
62

    
63

    
64
class SpecifyException(Exception):
65
    pass
66

    
67

    
68
class Canonical(object):
69

    
70
    _random_choice = None
71

    
72
    def __init__(self, *args, **kw):
73
        self.args = []
74
        named_args = []
75
        for a in args:
76
            if isinstance(a, tuple) and len(a) == 2:
77
                named_args.append(a)
78
            else:
79
                self.args.append(a)
80
        ordered_dict = OrderedDict(named_args)
81

    
82
        self.name = kw.pop('classname', self.__class__.__name__)
83
        random_choice = kw.pop('random', None)
84
        if random_choice is not None:
85
            self.random_choice = random_choice
86
        opts = {}
87
        for k, v in kw.items():
88
            if not isinstance(v, Canonical):
89
                if isclass(v) and issubclass(v, Canonical):
90
                    m = ("argument '%s': value '%s' is a Canonical _class_. "
91
                         "Perhaps you meant to specify a Canonical _instance_"
92
                         % (k, v))
93
                    raise SpecifyException(m)
94
                opts[k] = v
95
                del kw[k]
96

    
97
        self.opts = opts
98
        ordered_dict.update(kw)
99
        self.kw = ordered_dict
100
        self.init()
101

    
102
        if 'default' in opts:
103
            item = opts['default']
104
            if item is None:
105
                opts['null'] = 1
106
            else:
107
                opts['default'] = self._check(item)
108

    
109
    def init(self):
110
        return
111

    
112
    def __call__(self, item):
113
        return self.check(item)
114

    
115
    def check(self, item):
116
        if argmap_check(item):
117
            item = self._unpack(item)
118

    
119
        opts = self.opts
120
        if item is None and 'default' in opts:
121
            item = opts['default']
122

    
123
        can_be_null = opts.get('null', False)
124
        if item is None and can_be_null:
125
            return None
126

    
127
        return self._check(item)
128

    
129
    def _check(self, item):
130
        return item
131

    
132
    def _unpack(self, item):
133
        return argmap_unpack_list(item)
134

    
135
    def create(self):
136
        return None
137

    
138
    def random(self, **kw):
139
        random_choice = self._random_choice
140
        if random_choice is None:
141
            return None
142

    
143
        if callable(random_choice):
144
            return random_choice(kw)
145

    
146
        if isinstance(random_choice, str):
147
            return getattr(self, random_choice)(kw)
148

    
149
        return choice(random_choice)
150

    
151
    def tostring(self, depth=0, showopts=0, multiline=0):
152
        depth += 1
153
        if not multiline:
154
            argdepth = ''
155
            owndepth = ''
156
            joinchar = ','
157
            padchar = ''
158
        else:
159
            argdepth = '    ' * depth
160
            owndepth = '    ' * (depth - 1)
161
            joinchar = ',\n'
162
            padchar = '\n'
163

    
164
        args = [a.tostring(
165
                    depth=depth,
166
                    showopts=showopts,
167
                    multiline=multiline) for a in self.args]
168
        args += [("%s=%s" % (k, v.tostring(
169
                                depth=depth,
170
                                showopts=showopts,
171
                                multiline=multiline)))
172
                            for k, v in self.kw.items()]
173
        if showopts:
174
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
175

    
176
        if len(args) == 0:
177
            string = "%s(%s)" % (self.name, ','.join(args))
178
        else:
179
            string = "%s(%s" % (self.name, padchar)
180
            for arg in args:
181
                string += argdepth + arg + joinchar
182
            string = string[:-1] + padchar
183
            string += owndepth + ")"
184

    
185
        return string
186

    
187
    __str__ = tostring
188

    
189
    def __repr__(self):
190
        return self.tostring(multiline=0, showopts=1)
191

    
192
    def show(self):
193
        showable = self.opts.get('show', True)
194
        return self._show() if showable else ''
195

    
196
    def _show(self):
197
        return self.name
198

    
199

    
200
class Null(Canonical):
201

    
202
    def _check(self, item):
203
        return None
204

    
205
Nothing = Null()
206

    
207

    
208
class Integer(Canonical):
209

    
210
    def _check(self, item):
211
        try:
212
            num = long(item)
213
        except ValueError:
214
            try:
215
                num = long(item, 16)
216
            except Exception:
217
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
218
                raise CanonifyException(m)
219
        except TypeError:
220
            m = "%s: cannot convert '%s' to long" % (self, shorts(item))
221
            raise CanonifyException(m)
222

    
223
        optget = self.opts.get
224
        minimum = optget('minimum', None)
225
        maximum = optget('maximum', None)
226

    
227
        if minimum is not None and num < minimum:
228
            m = "%s: %d < minimum=%d" % (self, num, minimum)
229
            raise CanonifyException(m)
230

    
231
        if maximum is not None and num > maximum:
232
            m = "%s: %d > maximum=%d" % (self, num, maximum)
233
            raise CanonifyException(m)
234

    
235
        return num
236

    
237
    def _random_choice(self, kw):
238
        optget = self.opts.get
239
        kwget = kw.get
240
        minimum = kwget('minimum', optget('minimum', -4294967296L))
241
        maximum = kwget('maximum', optget('maximum', 4294967295L))
242
        r = random()
243
        if r < 0.1:
244
            return minimum
245
        if r < 0.2:
246
            return maximum
247
        if minimum <= 0 and maximum >= 0 and r < 0.3:
248
            return 0L
249
        return long(minimum + r * (maximum - minimum))
250

    
251

    
252
Serial = Integer(classname='Serial', null=True)
253

    
254

    
255
class Text(Canonical):
256

    
257
    re = None
258
    matcher = None
259
    choices = None
260

    
261
    def init(self):
262
        opts = self.opts
263
        if 'regex' in opts:
264
            pat = opts['regex']
265
            re = self.re
266
            if re is None:
267
                import re
268
                self.re = re
269

    
270
            self.matcher = re.compile(pat, re.UNICODE)
271
            self.pat = pat
272

    
273
        if 'choices' in opts:
274
            opts['choices'] = dict((unicode(x), unicode(x))
275
                                    for x in opts['choices'])
276

    
277
    def _check(self, item):
278
        if not isinstance(item, unicode):
279
            # require non-unicode items to be utf8
280
            item = str(item)
281
            try:
282
                item = item.decode('utf8')
283
            except UnicodeDecodeError, e:
284
                item = item.decode('latin1')
285
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
286
                raise CanonifyException(m)
287

    
288
        opts = self.opts
289
        if 'choices' in opts:
290
            choices = opts['choices']
291
            try:
292
                unknown = item not in choices
293
            except TypeError, e:
294
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
295
                raise CanonifyException(m, e)
296

    
297
            if unknown:
298
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
299
                raise CanonifyException(m)
300

    
301
            return choices[item]
302

    
303
        optget = opts.get
304
        itemlen = len(item)
305
        maxlen = optget('maxlen', None)
306
        if maxlen is not None and itemlen > maxlen:
307
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
308
            raise CanonifyException(m)
309

    
310
        minlen = optget('minlen', None)
311
        if minlen is not None and itemlen < minlen:
312
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
313
            raise CanonifyException(m)
314

    
315
        matcher = self.matcher
316
        if matcher is not None:
317
            match = matcher.match(item)
318
            if ((not match) or (match.start(), match.end()) != (0, itemlen)):
319

    
320
                    m = ("%s: '%s' does not match '%s'"
321
                            % (self, shorts(item), self.pat))
322
                    raise CanonifyException(m)
323

    
324
        return item
325

    
326
    default_alphabet = '0123456789ฮฑฮฒฮณฮดฮตฮถ'.decode('utf8')
327

    
328
    def _random_choice(self, kw):
329
        opts = self.opts
330
        if 'regex' in opts:
331
            m = 'Unfortunately, random for regex strings not supported'
332
            raise ValueError(m)
333

    
334
        optget = opts.get
335
        kwget = kw.get
336
        minlen = kwget('minlen', optget('minlen', 0))
337
        maxlen = kwget('maxlen', optget('maxlen', 32))
338
        alphabet = kwget('alphabet', self.default_alphabet)
339
        z = maxlen - minlen
340
        if z < 1:
341
            z = 1
342

    
343
        g = log(z, 2)
344
        r = random() * g
345
        z = minlen + int(2 ** r)
346

    
347
        s = u''
348
        for _ in xrange(z):
349
            s += choice(alphabet)
350

    
351
        return s
352

    
353

    
354
class Bytes(Canonical):
355

    
356
    re = None
357
    matcher = None
358
    choices = None
359

    
360
    def init(self):
361
        opts = self.opts
362
        if 'regex' in opts:
363
            pat = opts['regex']
364
            re = self.re
365
            if re is None:
366
                import re
367
                self.re = re
368

    
369
            self.matcher = re.compile(pat)
370
            self.pat = pat
371

    
372
        if 'choices' in opts:
373
            opts['choices'] = dict((str(x), str(x))
374
                                    for x in opts['choices'])
375

    
376
    def _check(self, item):
377
        if isinstance(item, unicode):
378
            # convert unicode to utf8
379
            item = item.encode('utf8')
380

    
381
        opts = self.opts
382
        if 'choices' in opts:
383
            choices = opts['choices']
384
            try:
385
                unknown = item not in choices
386
            except TypeError, e:
387
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
388
                raise CanonifyException(m, e)
389

    
390
            if unknown:
391
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
392
                raise CanonifyException(m)
393

    
394
            return choices[item]
395

    
396
        optget = opts.get
397
        itemlen = len(item)
398
        maxlen = optget('maxlen', None)
399
        if maxlen is not None and itemlen > maxlen:
400
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
401
            raise CanonifyException(m)
402

    
403
        minlen = optget('minlen', None)
404
        if minlen is not None and itemlen < minlen:
405
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
406
            raise CanonifyException(m)
407

    
408
        matcher = self.matcher
409
        if matcher is not None:
410
            match = matcher.match(item)
411
            if ((not match) or (match.start(), match.end()) != (0, itemlen)):
412
                    m = ("%s: '%s' does not match '%s'"
413
                            % (self, shorts(item), self.pat))
414
                    raise CanonifyException(m)
415

    
416
        return item
417

    
418
    default_alphabet = '0123456789abcdef'
419

    
420
    def _random_choice(self, kw):
421
        opts = self.opts
422
        if 'regex' in opts:
423
            m = 'Unfortunately, random for regex strings not supported'
424
            raise ValueError(m)
425

    
426
        optget = opts.get
427
        kwget = kw.get
428
        minlen = kwget('minlen', optget('minlen', 0))
429
        maxlen = kwget('maxlen', optget('maxlen', 32))
430
        alphabet = kwget('alphabet', self.default_alphabet)
431
        z = maxlen - minlen
432
        if z < 1:
433
            z = 1
434

    
435
        g = log(z, 2)
436
        r = random() * g
437
        z = minlen + int(2 ** r)
438

    
439
        s = u''
440
        for _ in xrange(z):
441
            s += choice(alphabet)
442

    
443
        return s
444

    
445

    
446
class ListOf(Canonical):
447

    
448
    def init(self):
449
        args = self.args
450
        kw = self.kw
451

    
452
        if not (args or kw):
453
            raise SpecifyException("ListOf requires one or more arguments")
454

    
455
        if args and kw:
456
            m = ("ListOf requires either positional "
457
                 "or keyword arguments, but not both")
458
            raise SpecifyException(m)
459

    
460
        if args:
461
            if len(args) > 1:
462
                self.canonical = Tuple(*args)
463
            else:
464
                self.canonical = args[0]
465
        else:
466
            self.canonical = Args(**kw)
467

    
468
    def _check(self, item):
469
        if item is None:
470
            item = ()
471

    
472
        try:
473
            items = iter(item)
474
        except TypeError:
475
            m = "%s: %s is not iterable" % (self, shorts(item))
476
            raise CanonifyException(m)
477

    
478
        canonical = self.canonical
479
        canonified = []
480
        append = canonified.append
481

    
482
        for item in items:
483
            item = canonical(item)
484
            append(item)
485

    
486
        if not canonified and self.opts.get('nonempty', False):
487
            m = "%s: must be nonempty" % (self,)
488
            raise CanonifyException(m)
489

    
490
        return canonified
491

    
492
    def _random_choice(self, kw):
493
        z = randint(1, 4)
494
        get_random = self.canonical.random
495

    
496
        return [get_random() for _ in xrange(z)]
497

    
498
    def _show(self):
499
        return '[ ' + self.canonical.show() + ' ... ]'
500

    
501

    
502
class Args(Canonical):
503

    
504
    def _unpack(self, item):
505
        arglist = argmap_unpack_dict(item)
506
        keys = self.kw.keys()
507
        arglen = len(arglist)
508
        if arglen != len(keys):
509
            m = "inconsistent number of parameters: %s != %s" % (
510
            arglen, len(keys))
511
            raise CanonifyException(m)
512

    
513
        position = 0
514
        named_args = OrderedDict()
515

    
516
        for k, v in arglist:
517
            if k is not None:
518
                named_args[k] = v
519
            else:
520
                # find the right position
521
                for i in range(position, arglen):
522
                    key = keys[i]
523
                    if not key in named_args.keys():
524
                        position = i + 1
525
                        break
526
                else:
527
                    m = "Formal arguments exhausted"
528
                    raise AssertionError(m)
529
                named_args[key] = v
530

    
531
        return named_args
532

    
533
    def _check(self, item):
534
        try:
535
            OrderedDict(item).items()
536
        except (TypeError, ValueError):
537
            m = "%s: %s is not dict-able" % (self, shorts(item))
538
            raise CanonifyException(m)
539

    
540
        canonified = OrderedDict()
541

    
542
        try:
543
            for n, c in self.kw.items():
544
                t = item[n] if n in item else None
545
                canonified[n] = c.check(t)
546
        except KeyError:
547
            m = ("%s: Argument '%s' not found in '%s'"
548
                 % (self, shorts(n), shorts(item)))
549
            raise CanonifyException(m)
550

    
551
        return canonified
552

    
553
    def _show(self):
554
        strings = [x for x in [c.show() for n, c in self.kw.items()] if x]
555
        return ' '.join(strings)
556

    
557
    def _random_choice(self, kw):
558
        args = {}
559
        for n, c in self.kw.items():
560
            args[n] = c.random()
561
        return args
562

    
563

    
564
class Tuple(Canonical):
565

    
566
    def _check(self, item):
567
        try:
568
            items = list(item)
569
        except TypeError:
570
            m = "%s: %s is not iterable" % (self, shorts(item))
571
            raise CanonifyException(m)
572

    
573
        canonicals = self.args
574
        zi = len(items)
575
        zc = len(canonicals)
576

    
577
        if zi != zc:
578
            m = "%s: expecting %d elements, not %d (%s)" % (
579
                self,
580
                zc,
581
                zi,
582
                str(items))
583
            raise CanonifyException(m)
584

    
585
        g = (canonical(element) for canonical, element in zip(self.args, item))
586

    
587
        return tuple(g)
588

    
589
    def __add__(self, other):
590
        oargs = other.args if isinstance(other, Tuple) else (other,)
591
        args = self.args + oargs
592
        return self.__class__(*args)
593

    
594
    def _random_choice(self, kw):
595
        return tuple(c.random() for c in self.args)
596

    
597
    def _show(self):
598
        canonicals = self.args
599
        strings = [x for x in [c.show() for c in canonicals] if x]
600
        return '[ ' + ' '.join(strings) + ' ]'
601

    
602

    
603
class Dict(Canonical):
604

    
605
    def _check(self, item):
606

    
607
        try:
608
            item = dict(item)
609
        except TypeError:
610
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
611
            raise CanonifyException(m)
612

    
613
        canonified = {}
614
        canonical = self.kw
615

    
616
        for n, c in canonical.items():
617
            if n not in item:
618
                m = "%s: key '%s' not found" % (self, shorts(n))
619
                raise CanonifyException(m)
620
            canonified[n] = c(item[n])
621

    
622
        strict = self.opts.get('strict', True)
623
        if strict and len(item) != len(canonical):
624
            for k in sorted(item.keys()):
625
                if k not in canonical:
626
                    break
627

    
628
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
629
            raise CanonifyException(m)
630

    
631
        return canonified
632

    
633
    def _random_choice(self, kw):
634
        item = {}
635
        for n, c in self.kw.items():
636
            item[n] = c.random()
637

    
638
        return item
639

    
640

    
641
class Canonifier(object):
642
    def __init__(self, name, input_canonicals, output_canonicals, doc_strings):
643
        self.name = name
644
        self.input_canonicals = dict(input_canonicals)
645
        self.output_canonicals = dict(output_canonicals)
646
        self.doc_strings = dict(doc_strings)
647

    
648
    def call_names(self):
649
        return self.input_canonicals.keys()
650

    
651
    def call_docs(self):
652
        get_input_canonical = self.input_canonical
653
        for call_name, call_doc in self.doc_strings.iteritems():
654
            if not call_doc:
655
                canonical = get_input_canonical(call_name)
656
                call_doc = canonical.tostring(showopts=1, multiline=1)
657
            yield call_name, call_doc
658

    
659
    def get_doc(self, name):
660
        doc_strings = self.doc_strings
661
        if name not in doc_strings:
662
            m = "%s: Invalid method name '%s'" % (self.name, name)
663
            raise CanonifyException(m)
664

    
665
        docstring = doc_strings[name]
666
        if not docstring:
667
            docstring = self.input_canonical(name).tostring()
668
        return docstring
669

    
670
    def call_attrs(self):
671
        for call_name, canonical in self.input_canonicals.iteritems():
672
            yield call_name, canonical.tostring(showopts=1, multiline=1)
673

    
674
    def input_canonical(self, name):
675
        input_canonicals = self.input_canonicals
676
        if name not in input_canonicals:
677
            m = "%s: Invalid input call '%s'" % (self.name, name)
678
            raise CanonifyException(m)
679

    
680
        return input_canonicals[name]
681

    
682
    def canonify_input(self, name, the_input):
683
        return self.input_canonical(name)(the_input)
684

    
685
    def output_canonical(self, name):
686
        output_canonicals = self.output_canonicals
687
        if name not in output_canonicals:
688
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
689
            raise CanonifyException(m)
690

    
691
        return output_canonicals[name]
692

    
693
    def canonify_output(self, name, the_output):
694
        return self.output_canonical(name)(the_output)
695

    
696
    def show_input_canonical(self, name):
697
        return self.input_canonical(name).show()
698

    
699
    def parse(self, method, arglist):
700
        args, rest = argmap_decode(arglist)
701
        return self.input_canonical(method).check(args)
702

    
703

    
704
class Specificator(object):
705

    
706
    def __new__(cls):
707
        if cls is Specificator:
708
            m = "Specificator classes must be subclassed"
709
            raise SpecifyException(m)
710

    
711
        import inspect
712

    
713
        canonical_inputs = {}
714
        canonical_outputs = {}
715
        doc_strings = {}
716

    
717
        for name in dir(cls):
718
            f = getattr(cls, name)
719
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
720
                continue
721

    
722
            doc_strings[name] = f.__doc__
723
            argspec = inspect.getargspec(f)
724
            defaults = argspec.defaults
725
            args = argspec.args
726
            if args and args[0] == 'self':
727
                args = args[1:]
728

    
729
            if not defaults:
730
                defaults = ()
731

    
732
            arglen = len(args)
733
            deflen = len(defaults)
734

    
735
            if arglen != deflen:
736
                a = (f.__name__, args[:arglen - deflen])
737
                m = "Unspecified arguments in '%s': %s" % a
738
                raise SpecifyException(m)
739

    
740
            args = zip(args, defaults)
741
            for a, c in args:
742
                if not isinstance(c, Canonical):
743
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
744
                         % (a, repr(c)))
745
                    raise SpecifyException(m)
746

    
747
            canonical = Null() if len(args) == 0 else Args(*args)
748
            canonical_inputs[name] = canonical
749

    
750
            self = object.__new__(cls)
751
            canonical = f(self)
752
            if not isinstance(canonical, Canonical):
753
                m = ("method '%s' does not return a Canonical, but a(n) %s "
754
                                                    % (name, type(canonical)))
755
                raise SpecifyException(m)
756
            canonical_outputs[name] = canonical
757

    
758
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs,
759
                          doc_strings)
760

    
761
    def __call__(self):
762
        return self