Statistics
| Branch: | Tag: | Revision:

root / commissioning / api / specificator.py @ 9c50a5ac

History | View | Annotate | Download (17.3 kB)

1
# -*- coding: utf8 -*- 
2
from random import random, choice, randint
3
from math import log
4

    
5
def shorts(s):
6
    if not isinstance(s, unicode):
7
        s = str(s)
8

    
9
    if len(s) <= 64:
10
        return s
11

    
12
    return s[:61] + '...'
13
        
14

    
15
class CanonifyException(Exception):
16
    pass
17

    
18
class SpecifyException(Exception):
19
    pass
20

    
21

    
22
class Canonical(object):
23

    
24
    random_choice = None
25

    
26
    def __init__(self, *args, **kw):
27
        self.args = args
28
        self.kw = kw
29
        self.name = kw.pop('classname', self.__class__.__name__)
30
        random_choice = kw.pop('random', None)
31
        if random_choice is not None:
32
            self.random_choice = random_choice
33
        opts = {}
34
        for k, v in kw.items():
35
            if not isinstance(v, Canonical):
36
                opts[k] = v
37
                del kw[k]
38

    
39
        self.opts = opts
40
        self.init()
41

    
42
        if 'default' in opts:
43
            item = opts['default']
44
            if item is None:
45
                opts['null'] = 1
46
            else:
47
                opts['default'] = self.check(item)
48

    
49
    def init(self):
50
        return
51

    
52
    def __call__(self, item):
53
        opts = self.opts
54
        if item is None and 'default' in opts:
55
            item = opts['default']
56

    
57
        can_be_null = opts.get('null', False)
58
        if item is None and can_be_null:
59
            return None
60

    
61
        return self.check(item)
62

    
63
    def check(self, item):
64
        return item
65

    
66
    def create(self):
67
        return None
68

    
69
    def random(self, **kw):
70
        random_choice = self.random_choice
71
        if random_choice is None:
72
            return None
73

    
74
        if callable(random_choice):
75
            return random_choice(kw)
76

    
77
        if isinstance(random_choice, str):
78
            return getattr(self, random_choice)(kw)
79

    
80
        return choice(random_choice)
81

    
82
    def tostring(self, depth=0, showopts=0, multiline=0):
83
        depth += 1
84
        if not multiline:
85
            argdepth = ''
86
            owndepth = ''
87
            joinchar = ','
88
            padchar = ''
89
        else:
90
            argdepth = '    ' * depth
91
            owndepth = '    ' * (depth - 1)
92
            joinchar = ',\n'
93
            padchar = '\n'
94

    
95
        args = [a.tostring( depth=depth,
96
                            showopts=showopts,
97
                            multiline=multiline) for a in self.args]
98
        args += [("%s=%s" %
99
                    (k, v.tostring( depth=depth,
100
                                    showopts=showopts,
101
                                    multiline=multiline)))
102
                                    
103
                                    for k, v in self.kw.items()]
104
        if showopts:
105
            args += [("%s=%s" % (k, str(v))) for k, v in self.opts.items()]
106

    
107
        if len(args) == 0:
108
            string = "%s(%s)" % (self.name, ','.join(args))
109
        else:
110
            string = "%s(%s" % (self.name, padchar)
111
            for arg in args:
112
                string += argdepth + arg + joinchar
113
            string = string[:-1] + padchar
114
            string += owndepth + ")"
115

    
116
        return string
117

    
118
    __str__ = tostring
119

    
120
    def __repr__(self):
121
        return self.tostring(multiline=0, showopts=1)
122

    
123
    def check(item):
124
        canonified = item
125
        return canonified
126

    
127

    
128
class Null(Canonical):
129

    
130
    def check(self, item):
131
        return None
132

    
133
Nothing = Null()
134

    
135

    
136
class Integer(Canonical):
137

    
138
    def check(self, item):
139
        try:
140
            num = long(item)
141
        except ValueError, e:
142
            try:
143
                num = long(item, 16)
144
            except Exception:
145
                m = "%s: cannot convert '%s' to long" % (self, shorts(item))
146
                raise CanonifyException(m)
147

    
148
        optget = self.opts.get
149
        minimum = optget('minimum', None)
150
        maximum = optget('maximum', None)
151

    
152
        if minimum is not None and num < minimum:
153
            m = "%s: %d < minimum=%d" % (self, num, minimum)
154
            raise CanonifyException(m)
155

    
156
        if maximum is not None and num > maximum:
157
            m = "%s: %d > maximum=%d" % (self, num, maximum)
158
            raise CanonifyException(m)
159

    
160
        return num
161

    
162
    def random_integer(self, kw):
163
        optget = self.opts.get
164
        kwget = kw.get
165
        minimum = kwget('minimum', optget('minimum', -4294967296L))
166
        maximum = kwget('maximum', optget('maximum', 4294967295L))
167
        r = random()
168
        if r < 0.1:
169
            return minimum
170
        if r < 0.2:
171
            return maximum
172
        if minimum <= 0 and maximum >= 0 and r < 0.3:
173
            return 0L
174
        return long(minimum + r * (maximum - minimum))
175

    
176
    random_choice = random_integer
177

    
178

    
179

    
180
Serial = Integer(
181
            classname   =   'Serial',
182
            null        =   True,
183
)
184

    
185

    
186
class Text(Canonical):
187

    
188
    re = None
189
    matcher = None
190
    choices = None
191

    
192
    def init(self):
193
        opts = self.opts
194
        if 'regex' in opts:
195
            pat = opts['regex']
196
            re = self.re
197
            if re is None:
198
                import re
199
                self.re = re
200

    
201
            self.matcher = re.compile(pat, re.UNICODE)
202
            self.pat = pat
203

    
204
        if 'choices' in opts:
205
            opts['choices'] = dict((unicode(x), unicode(x))
206
                                    for x in opts['choices'])
207

    
208
    def check(self, item):
209
        if not isinstance(item, unicode):
210
            # require non-unicode items to be utf8
211
            item = str(item)
212
            try:
213
                item = item.decode('utf8')
214
            except UnicodeDecodeError, e:
215
                item = item.decode('latin1')
216
                m = "%s: non-unicode '%s' is not utf8" % (self, shorts(item))
217
                raise CanonifyException(m)
218

    
219
        opts = self.opts
220
        if 'choices' in opts:
221
            choices = opts['choices']
222
            try:
223
                unknown = item not in choices
224
            except TypeError, e:
225
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
226
                raise CanonifyException(m, e)
227

    
228
            if unknown:
229
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
230
                raise CanonifyException(m)
231

    
232
            return choices[item]
233

    
234
        optget = opts.get
235
        itemlen = len(item)
236
        maxlen = optget('maxlen', None)
237
        if maxlen is not None and itemlen > maxlen:
238
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
239
            raise CanonifyException(m)
240

    
241
        minlen = optget('minlen', None)
242
        if minlen is not None and itemlen < minlen:
243
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
244
            raise CanonifyException(m)
245
            
246
        matcher = self.matcher
247
        if matcher is not None:
248
            match = matcher.match(item)
249
            if  (       match is None
250
                    or  (match.start(), match.end()) != (0, itemlen)    ):
251

    
252
                    m = ("%s: '%s' does not match '%s'"
253
                            % (self, shorts(item), self.pat))
254
                    raise CanonifyException(m)
255

    
256
        return item
257

    
258
    default_alphabet = '0123456789αβγδεζ'.decode('utf8')
259

    
260
    def random_string(self, kw):
261
        opts = self.opts
262
        if 'regex' in opts:
263
            m = 'Unfortunately, random for regex strings not supported'
264
            raise ValueError(m)
265

    
266
        optget = opts.get
267
        kwget = kw.get
268
        minlen = kwget('minlen', optget('minlen', 0))
269
        maxlen = kwget('maxlen', optget('maxlen', 32))
270
        alphabet = kwget('alphabet', self.default_alphabet)
271
        z = maxlen - minlen
272
        if z < 1:
273
            z = 1
274

    
275
        g = log(z, 2)
276
        r = random() * g
277
        z = minlen + int(2**r)
278

    
279
        s = u''
280
        for _ in xrange(z):
281
            s += choice(alphabet)
282

    
283
        return s
284

    
285
    random_choice = random_string
286

    
287

    
288
class Bytes(Canonical):
289

    
290
    re = None
291
    matcher = None
292
    choices = None
293

    
294
    def init(self):
295
        opts = self.opts
296
        if 'regex' in opts:
297
            pat = opts['regex']
298
            re = self.re
299
            if re is None:
300
                import re
301
                self.re = re
302

    
303
            self.matcher = re.compile(pat)
304
            self.pat = pat
305

    
306
        if 'choices' in opts:
307
            opts['choices'] = dict((str(x), str(x))
308
                                    for x in opts['choices'])
309

    
310
    def check(self, item):
311
        if isinstance(item, unicode):
312
            # convert unicode to utf8
313
            item = item.encode('utf8')
314

    
315
        opts = self.opts
316
        if 'choices' in opts:
317
            choices = opts['choices']
318
            try:
319
                unknown = item not in choices
320
            except TypeError, e:
321
                m = "%s: unhashable type '%s'" % (self.name, shorts(item))
322
                raise CanonifyException(m, e)
323

    
324
            if unknown:
325
                m = "%s: '%s' not in choices" % (self.name, shorts(item))
326
                raise CanonifyException(m)
327

    
328
            return choices[item]
329

    
330
        optget = opts.get
331
        itemlen = len(item)
332
        maxlen = optget('maxlen', None)
333
        if maxlen is not None and itemlen > maxlen:
334
            m = "%s: len('%s') > maxlen=%d" % (self, shorts(item), maxlen)
335
            raise CanonifyException(m)
336

    
337
        minlen = optget('minlen', None)
338
        if minlen is not None and itemlen < minlen:
339
            m = "%s: len('%s') < minlen=%d" % (self, shorts(item), minlen)
340
            raise CanonifyException(m)
341
            
342
        matcher = self.matcher
343
        if matcher is not None:
344
            match = matcher.match(item)
345
            if  (       match is None
346
                    or  (match.start(), match.end()) != (0, itemlen)    ):
347

    
348
                    m = ("%s: '%s' does not match '%s'"
349
                            % (self, shorts(item), self.pat))
350
                    raise CanonifyException(m)
351

    
352
        return item
353

    
354
    default_alphabet = '0123456789abcdef'
355

    
356
    def random_bytes(self, kw):
357
        opts = self.opts
358
        if 'regex' in opts:
359
            m = 'Unfortunately, random for regex strings not supported'
360
            raise ValueError(m)
361

    
362
        optget = opts.get
363
        kwget = kw.get
364
        minlen = kwget('minlen', optget('minlen', 0))
365
        maxlen = kwget('maxlen', optget('maxlen', 32))
366
        alphabet = kwget('alphabet', self.default_alphabet)
367
        z = maxlen - minlen
368
        if z < 1:
369
            z = 1
370

    
371
        g = log(z, 2)
372
        r = random() * g
373
        z = minlen + int(2**r)
374

    
375
        s = u''
376
        for _ in xrange(z):
377
            s += choice(alphabet)
378

    
379
        return s
380

    
381
    random_choice = random_bytes
382

    
383

    
384
class ListOf(Canonical):
385

    
386
    def init(self):
387
        args = self.args
388
        kw = self.kw
389

    
390
        if not (args or kw):
391
            raise SpecifyException("ListOf requires one or more arguments")
392

    
393
        if args and kw:
394
            m = ("ListOf requires either positional "
395
                 "or keyword arguments, but not both")
396
            raise SpecifyException(m)
397

    
398
        if args:
399
            if len(args) > 1:
400
                self.canonical = Tuple(*args)
401
            else:
402
                self.canonical = args[0]
403
        else:
404
            self.canonical = Args(**kw)
405

    
406
    def check(self, item):
407
        if item is None:
408
            item = ()
409

    
410
        try:
411
            items = iter(item)
412
        except TypeError, e:
413
            m = "%s: %s is not iterable" % (self, shorts(item))
414
            raise CanonifyException(m)
415

    
416
        canonical = self.canonical
417
        canonified = []
418
        append = canonified.append
419

    
420
        for item in items:
421
            item = canonical(item)
422
            append(item)
423

    
424
        if not canonified and self.opts.get('nonempty', False):
425
            m = "%s: must be nonempty" % (self,)
426
            raise CanonifyException(m)
427

    
428
        return canonified
429

    
430
    def random_listof(self, kw):
431
        z = randint(1, 4)
432
        get_random = self.canonical.random
433

    
434
        return [get_random() for _ in xrange(z)]
435

    
436
    random_choice = random_listof
437

    
438

    
439
class Args(Canonical):
440

    
441
    def init(self):
442
        if self.args:
443
            raise ValueError("Args accepts only keyword arguments")
444

    
445
    def check(self, item):
446
        try:
447
            item = dict(item)
448
        except TypeError, e:
449
            m = "%s: %s is not dict-able" % (self, shorts(item))
450
            raise CanonifyException(m)
451

    
452
        canonified = {}
453

    
454
        try:
455
            for n, c in self.kw.items():
456
                t = item[n] if n in item else None
457
                canonified[n] = c(t)
458
        except KeyError:
459
            m = ("%s: Argument '%s' not found in '%s'" 
460
                        % (self, shorts(n), shorts(item)))
461
            raise CanonifyException(m)
462

    
463
        return canonified
464

    
465
    def random_args(self, kw):
466
        args = {}
467
        for n, c in self.kw.items():
468
            args[n] = c.random()
469
        return args
470

    
471
    random_choice = random_args
472

    
473

    
474
class Tuple(Canonical):
475

    
476
    def check(self, item):
477
        try:
478
            items = list(item)
479
        except TypeError, e:
480
            m = "%s: %s is not iterable" % (self, shorts(item))
481
            raise CanonifyException(m)
482

    
483
        canonicals = self.args
484
        zi = len(items)
485
        zc = len(canonicals)
486

    
487
        if zi != zc:
488
            m = "%s: expecting %d elements, not %d (%s)" % (self, zc, zi, str(items))
489
            raise CanonifyException(m)
490

    
491
        g = (canonical(element) for canonical, element in zip(self.args, item))
492

    
493
        return tuple(g)
494

    
495
    def __add__(self, other):
496
        oargs = other.args if isinstance(other, Tuple) else (other,)
497
        args = self.args + oargs
498
        return self.__class__(*args)
499

    
500
    def random_tuple(self, kw):
501
        return tuple(c.random() for c in self.args)
502

    
503
    random_choice = random_tuple
504

    
505

    
506
class Dict(Canonical):
507

    
508
    def check(self, item):
509

    
510
        try:
511
            item = dict(item)
512
        except TypeError:
513
            m = "%s: '%s' is not dict-able" % (self, shorts(item))
514
            raise CanonifyException(m)
515

    
516
        canonified = {}
517
        canonical = self.kw
518

    
519
        for n, c in canonical.items():
520
            if n not in item:
521
                m = "%s: key '%s' not found" % (self, shorts(n))
522
                raise CanonifyException(m)
523
            canonified[n] = c(item[n])   
524

    
525
        strict = self.opts.get('strict', False)
526
        if strict and len(item) != len(canonical):
527
            for k in sorted(item.keys()):
528
                if k not in canonical:
529
                    break
530

    
531
            m = "%s: unexpected key '%s' (strict mode)" % (self, shorts(k))
532
            raise CanonifyException(m)
533

    
534
        return canonified
535

    
536
    def random_dict(self, kw):
537
        item = {}
538
        for n, c in self.canonical.items():
539
            item[n] = c.random()
540

    
541
        return item
542

    
543
    random_choice = random_dict
544

    
545

    
546
class Canonifier(object):
547
    def __init__(self, name, input_canonicals, output_canonicals):
548
        self.name = name
549
        self.input_canonicals = dict(input_canonicals)
550
        self.output_canonicals = dict(output_canonicals)
551

    
552
    def call_names(self):
553
        return self.input_canonicals.keys()
554

    
555
    def call_attrs(self):
556
        for call_name, canonical in self.input_canonicals.iteritems():
557
            yield call_name, canonical.tostring(showopts=1, multiline=1)
558

    
559
    def input_canonical(self, name):
560
        input_canonicals = self.input_canonicals
561
        if name not in input_canonicals:
562
            m = "%s: Invalid input call '%s'" % (self.name, name)
563
            raise CanonifyException(m)
564

    
565
        return input_canonicals[name]
566

    
567
    def canonify_input(self, name, the_input):
568
        return self.input_canonical(name)(the_input)
569

    
570
    def output_canonical(self, name):
571
        output_canonicals = self.output_canonicals
572
        if name not in output_canonicals:
573
            m = "%s: Output canonical '%s' does not exist" % (self.name, name)
574
            raise CanonifyException(m)
575

    
576
        return output_canonicals[name]
577

    
578
    def canonify_output(self, name, the_output):
579
        return self.output_canonical(name)(the_output)
580

    
581

    
582
class Specificator(object):
583

    
584
    def __new__(cls):
585
        if cls is Specificator:
586
            m = "Specificator classes must be subclassed"
587
            raise SpecifyException(m)
588

    
589
        import inspect
590

    
591
        canonical_inputs = {}
592
        canonical_outputs = {}
593

    
594
        for name in dir(cls):
595
            f = getattr(cls, name)
596
            if not inspect.ismethod(f) or f.__name__.startswith('_'):
597
                continue
598

    
599
            argspec = inspect.getargspec(f)
600
            defaults = argspec.defaults
601
            args = argspec.args
602
            if args and args[0] == 'self':
603
                args = args[1:]
604

    
605
            if not defaults:
606
                defaults = ()
607

    
608
            arglen = len(args)
609
            deflen = len(defaults)
610

    
611
            if arglen != deflen:
612
                a = (f.__name__, args[:arglen-deflen])
613
                m = "Unspecified arguments in '%s': %s" % a
614
                raise SpecifyException(m)
615

    
616
            args = dict(zip(args, defaults))
617
            for a, c in args.items():
618
                if not isinstance(c, Canonical):
619
                    m = ("argument '%s=%s' is not an instance of 'Canonical'"
620
                         % (a, repr(c)))
621
                    raise SpecifyException(m)
622

    
623
            canonical = Null() if len(args) == 0 else Args(**args)
624
            canonical_inputs[name] = canonical
625

    
626
            self = object.__new__(cls)
627
            canonical = f(self)
628
            if not isinstance(canonical, Canonical):
629
                m = ("method '%s' does not return a Canonical, but a(n) %s "
630
                                                    % (name, type(canonical)))
631
                raise SpecifyException(m)
632
            canonical_outputs[name] = canonical
633

    
634
        return Canonifier(cls.__name__, canonical_inputs, canonical_outputs)
635

    
636
    def __call__(self):
637
        return self
638