Source code for bases.random

"""
    Functions to generate random data.
"""

from __future__ import annotations

# pylint: disable = global-statement

from contextlib import contextmanager
from itertools import chain, islice
from random import Random # pylint: disable = import-self
from types import MappingProxyType
from typing import Any, Dict, Iterator, Mapping, Optional
from typing_validation import validate

from .alphabet import Alphabet
from .encoding import BaseEncoding, SimpleBaseEncoding, ZeropadBaseEncoding, BlockBaseEncoding, FixcharBaseEncoding


_default_options: Mapping[str, Any] = MappingProxyType({
    "min_bytes": 0,
    "max_bytes": 16,
    "min_chars": 0,
    "max_chars": 16,
})


_options: Mapping[str, Any] = MappingProxyType(_default_options)
_rand: Random = Random(0)


[docs] def reset_options() -> None: """ Resets random generation options to their default values. """ global _options global _rand _options = _default_options _rand = Random(0)
[docs] def default_options() -> Mapping[str, Any]: """ Readonly view of the default random generation options. """ return _default_options
[docs] def get_options() -> Mapping[str, Any]: """ Readonly view of the current random generation options. """ return _options
[docs] @contextmanager def options(*, seed: Optional[int] = None, min_bytes: Optional[int] = None, max_bytes: Optional[int] = None, min_chars: Optional[int] = None, max_chars: Optional[int] = None,) -> Iterator[None]: """ Returns with-statement context manager for temporary option setting: .. code-block:: python with options(**options): for value in rand_data(num_samples, encoding): ... See :func:`set_options` for a description of the options. """ # pylint: disable = too-many-locals for arg in (seed, min_bytes, max_bytes, min_chars, max_chars): validate(arg, Optional[int]) global _options global _rand _old_options = _options _old_rand = _rand try: set_options(seed=seed, min_bytes=min_bytes, max_bytes=max_bytes, min_chars=min_chars, max_chars=max_chars,) yield finally: _options = _old_options _rand = _old_rand
[docs] def set_options(*, seed: Optional[int] = None, min_bytes: Optional[int] = None, max_bytes: Optional[int] = None, min_chars: Optional[int] = None, max_chars: Optional[int] = None,) -> None: """ Permanently sets random generation options: .. code-block:: python seed: int # set new random number generator, with this seed min_bytes: int # min length of `bytes` value max_bytes: int # max length of `bytes` value min_chars: int # min length of `str` value max_chars: int # max length of `str` value """ # pylint: disable = too-many-branches, too-many-locals, too-many-statements for arg in (seed, min_bytes, max_bytes, min_chars, max_chars): validate(arg, Optional[int]) global _options global _rand # set newly passed options _new_options: Dict[str, Any] = {} if seed is not None: _rand = Random(seed) if min_bytes is not None: if min_bytes < 0: raise ValueError("Value for min_bytes is negative.") _new_options["min_bytes"] = min_bytes if max_bytes is not None: if max_bytes < 0: raise ValueError("Value for max_bytes is negative.") _new_options["max_bytes"] = max_bytes if min_chars is not None: if min_chars < 0: raise ValueError("Value for min_chars is negative.") _new_options["min_chars"] = min_chars if max_chars is not None: if max_chars < 0: raise ValueError("Value for max_chars is negative.") _new_options["max_chars"] = max_chars # pass-through other options with former values for k, v in _options.items(): if k not in _new_options: _new_options[k] = v # check compatibility conditions if _new_options["min_bytes"] > _new_options["max_bytes"]: raise ValueError("Value for min_bytes is larger than value for max_bytes.") if _new_options["min_chars"] > _new_options["max_chars"]: raise ValueError("Value for min_chars is larger than value for max_chars.") # update options _options = MappingProxyType(_new_options)
[docs] def rand_bytes(n: Optional[int] = None, *, encoding: Optional[BaseEncoding] = None) -> Iterator[bytes]: """ Generates a stream of random :obj:`bytes` objects. If a number ``n`` is given, that number of samples is yelded. If an encoding ``encoding`` is given, only bytes valid for that encoding are yielded. Example usage: >>> my_random_bytes = list(random.rand_bytes(4, encoding=base10)) >>> [list(b) for b in my_random_bytes] [[0, 30, 135, 156, 223, 90, 134, 83, 6, 243, 245], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 49, 216, 87, 1, 2], [70, 98, 190, 187, 66, 224, 178], [0, 96, 63]] :param n: the number of samples :type n: :obj:`int` or :obj:`None`, *optional* :param encoding: optional encoding for which the bytestrings must be valid :type encoding: :obj:`~bases.encoding.base.BaseEncoding` or :obj:`None`, *optional* """ validate(n, Optional[int]) validate(encoding, Optional[BaseEncoding]) if encoding is None: return rand_raw_bytes(n) if isinstance(encoding, SimpleBaseEncoding): return _rand_bytes_simple_enc(n, encoding) if isinstance(encoding, ZeropadBaseEncoding): return _rand_bytes_zeropad_enc(n, encoding) if isinstance(encoding, BlockBaseEncoding): return _rand_bytes_block_enc(n, encoding) if isinstance(encoding, FixcharBaseEncoding): return _rand_bytes_fixedchar_enc(n, encoding) raise ValueError(f"Unsupported encoding type {type(encoding)}")
[docs] def rand_raw_bytes(n: Optional[int] = None, *, min_bytes: Optional[int] = None, max_bytes: Optional[int] = None) -> Iterator[bytes]: """ Generates a stream of random :obj:`bytes` objects. If a number ``n`` is given, that number of samples is yelded. The optional ``min_bytes`` and ``max_bytes`` parameters can be used to set a minimum/maximum length for the :obj:`bytes` objects: if :obj:`None`, the values are fetched from :func:`get_options`. :param n: the number of samples :type n: :obj:`int` or :obj:`None`, *optional* :param min_bytes: the minimum length for the bytestrings :type min_bytes: :obj:`int` or :obj:`None`, *optional* :param max_bytes: the maximum length for the bytestrings :type max_bytes: :obj:`int` or :obj:`None`, *optional* """ validate(n, Optional[int]) validate(min_bytes, Optional[int]) validate(max_bytes, Optional[int]) if n is not None and n < 0: raise ValueError() if min_bytes is None: min_bytes = _options["min_bytes"] if max_bytes is None: max_bytes = _options["max_bytes"] rand = _rand # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.randint(min_bytes, max_bytes) # yield random unsigned integer filling l bytes i = rand.randrange(0, 256**l) yield i.to_bytes(l, byteorder="big") yielded += 1
def _rand_bytes_simple_enc(n: Optional[int], _: SimpleBaseEncoding) -> Iterator[bytes]: if n is not None and n < 0: raise ValueError() min_bytes = _options["min_bytes"] max_bytes = _options["max_bytes"] rand = _rand # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.randint(min_bytes, max_bytes) # yield random unsigned integer filling l bytes with no leading zero bytes if l == 0: i = 0 else: i = rand.randrange(256**(l-1), 256**l) yield i.to_bytes(l, byteorder="big") yielded += 1 def _rand_bytes_zeropad_enc(n: Optional[int], _: ZeropadBaseEncoding) -> Iterator[bytes]: if n is not None and n < 0: raise ValueError() min_bytes = _options["min_bytes"] max_bytes = _options["max_bytes"] rand = _rand # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.randint(min_bytes, max_bytes) # sample random number of leading zero bytes z = rand.randint(0, l) # yield random unsigned integer filling l-z bytes if l == z: i = 0 else: i = rand.randrange(256**(l-z-1), 256**(l-z)) yield i.to_bytes(l, byteorder="big") yielded += 1 def _rand_bytes_block_enc(n: Optional[int], encoding: BlockBaseEncoding) -> Iterator[bytes]: if n is not None and n < 0: raise ValueError() min_bytes = _options["min_bytes"] max_bytes = _options["max_bytes"] rand = _rand # pre-compute valid bytestring lengths for block base encoding block_nbytes = encoding.block_nbytes nbytes2nchars = encoding.nbytes2nchars valid_lengths = [l for l in range(min_bytes, max_bytes+1) if l%block_nbytes == 0 or l%block_nbytes in nbytes2nchars] # main yielding loop yielded = 0 while n is None or yielded < n: # sample random valid length l = rand.choice(valid_lengths) # yield random unsigned integer filling l bytes i = rand.randrange(0, 256**l) yield i.to_bytes(l, byteorder="big") yielded += 1 def _rand_bytes_fixedchar_enc(n: Optional[int], _: FixcharBaseEncoding) -> Iterator[bytes]: return rand_raw_bytes(n)
[docs] def rand_str(n: Optional[int] = None, *, encoding: Optional[BaseEncoding]=None, alphabet: Optional[Alphabet]=None) -> Iterator[str]: """ Generates a stream of random strings. If a number ``n`` is given, that number of samples is yelded. Exactly one of ``encoding`` or ``alphabet`` must be given: - if an ``encoding`` is given, only strings valid for that encoding are yielded - if an ``alphabet`` is given, only strings valid for that alphabet are yielded Example usage: >>> my_random_strings = list(random.rand_str(4, encoding=base32)) >>> my_random_strings ['2CQ7ZT6WNI', 'IGQJTGA', 'V6GW3UN64QDAFZA7', 'PUEMOPJ4'] :param n: the number of samples :type n: :obj:`int` or :obj:`None`, *optional* :param encoding: optional encoding for which the strings must be valid :type encoding: :obj:`~bases.encoding.base.BaseEncoding` or :obj:`None`, *optional* :param alphabet: optional alphabet for which the bytestrings must be valid :type alphabet: :obj:`~bases.alphabet.abstract.Alphabet` or :obj:`None`, *optional* :raises ValueError: unless exactly one of ``encoding`` or ``alphabet`` is specified :raises ValueError: if an instance of a an unsupported (i.e. custom) base encoding subclass is passed to ``encoding`` """ validate(n, Optional[int]) validate(encoding, Optional[BaseEncoding]) validate(alphabet, Optional[Alphabet]) if encoding is None: if alphabet is None: raise ValueError("One of 'encoding' or 'alphabet' must be specified.") return _rand_alphabet_string(n, alphabet) if alphabet is not None: raise ValueError("Exactly one of 'encoding' or 'alphabet' must be specified.") if isinstance(encoding, SimpleBaseEncoding): return _rand_str_simple_enc(n, encoding) if isinstance(encoding, ZeropadBaseEncoding): return _rand_str_zeropad_enc(n, encoding) if isinstance(encoding, BlockBaseEncoding): return _rand_str_block_enc(n, encoding) if isinstance(encoding, FixcharBaseEncoding): return _rand_str_fixedchar_enc(n, encoding) raise ValueError(f"Unsupported encoding type {type(encoding)}")
[docs] def rand_char(n: Optional[int] = None, *, alphabet: Alphabet, non_zero: bool = False) -> Iterator[str]: """ Generates a stream of random characters from the alphabet (one character yielded at a time). If a number ``n`` is given, that number of samples is yelded. If ``non_zero`` is :obj:`True`, the zero character for the alphabet is not yielded. :param n: the number of samples :type n: :obj:`int` or :obj:`None`, *optional* :param alphabet: optional alphabet for which the characters must be valid :type alphabet: :obj:`~bases.alphabet.abstract.Alphabet` or :obj:`None`, *optional* :param non_zero: whether to exclude the zero character for the alphabet :type non_zero: :obj:`bool`, *optional* """ if n is not None and n < 0: raise ValueError() start = 1 if non_zero else 0 end = len(alphabet) rand = _rand yielded = 0 while n is None or yielded < n: # yield random character (excluding zero character, if non_zero is True) idx = rand.randrange(start, end) yield alphabet[idx] yielded += 1
def _rand_alphabet_string(n: Optional[int], alphabet: Alphabet) -> Iterator[str]: if n is not None and n < 0: raise ValueError() min_chars = _options["min_chars"] max_chars = _options["max_chars"] rand = _rand # infinte random character stream rand_char_stream = rand_char(alphabet=alphabet) # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.randint(min_chars, max_chars) # yield random string filling l characters yield "".join(islice(rand_char_stream, l)) yielded += 1 def _rand_str_simple_enc(n: Optional[int], encoding: SimpleBaseEncoding) -> Iterator[str]: if n is not None and n < 0: raise ValueError() min_chars = _options["min_chars"] max_chars = _options["max_chars"] rand = _rand # infinte random character streams rand_char_stream = rand_char(alphabet=encoding.alphabet) rand_nonzero_char_stream = rand_char(alphabet=encoding.alphabet, non_zero=True) # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.randint(min_chars, max_chars) # yield random str filling l characters with no leading zero characters if l == 0: yield "" else: yield "".join(chain(islice(rand_nonzero_char_stream, 1), islice(rand_char_stream, l-1))) yielded += 1 def _rand_str_zeropad_enc(n: Optional[int], encoding: ZeropadBaseEncoding) -> Iterator[str]: if n is not None and n < 0: raise ValueError() min_chars = _options["min_chars"] max_chars = _options["max_chars"] rand = _rand # zero character zero_char = encoding.zero_char # infinte random character streams rand_char_stream = rand_char(alphabet=encoding.alphabet) rand_nonzero_char_stream = rand_char(alphabet=encoding.alphabet, non_zero=True) # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.randint(min_chars, max_chars) # sample random number of leading zero chars z = rand.randint(0, l) # yield random str filling l characters with given number of leading zeros if l-z == 0: yield zero_char*z else: yield zero_char*z+"".join(chain(islice(rand_nonzero_char_stream, 1), islice(rand_char_stream, l-z-1))) yielded += 1
[docs] def rand_block_chars(n: Optional[int] = None, *, block_nchars: int, encoding: BlockBaseEncoding) -> Iterator[str]: """ Generates a stream of random char blocks for a block base encoding. If a number ``n`` is given, that number of samples is yelded. The number ``block_nchars`` of characters in the blocks must be valid for the encoding. :param n: the number of samples :type n: :obj:`int` or :obj:`None`, *optional* :param block_nchars: the number of characters in a block :type block_nchars: :obj:`int` :param encoding: block encoding for which the char blocks must be valid :type encoding: :obj:`~bases.encoding.block.BlockBaseEncoding` """ if n is not None and n < 0: raise ValueError() # extract block size in chars and bytes nchars2nbytes = encoding.nchars2nbytes if block_nchars not in nchars2nbytes: raise ValueError(f"Invalid number of characters per block ({block_nchars})") block_nbytes = nchars2nbytes[block_nchars] # infinite random byte stream rand_bytes_stream = rand_raw_bytes(min_bytes=1, max_bytes=1) # main yielding loop yielded = 0 while n is None or yielded < n: block_bytes = b"".join(islice(rand_bytes_stream, block_nbytes)) s = encoding.encode(block_bytes) yield s yielded += 1
def _rand_str_block_enc(n: Optional[int], encoding: BlockBaseEncoding) -> Iterator[str]: if n is not None and n < 0: raise ValueError() min_chars = _options["min_chars"] max_chars = _options["max_chars"] rand = _rand # pre-compute valid string lengths for block base encoding block_nchars = encoding.block_nchars nchars2nbytes = encoding.nchars2nbytes valid_lengths = [l for l in range(min_chars, max_chars+1) if l%block_nchars == 0 or l%block_nchars in nchars2nbytes] # infinte random character streams rand_block_stream = { nchars: rand_block_chars(block_nchars=nchars, encoding=encoding) for nchars in nchars2nbytes } # main yielding loop yielded = 0 while n is None or yielded < n: # sample random valid length l = rand.choice(valid_lengths) num_full_blocks, final_block_nchars = divmod(l, block_nchars) full_blocks = islice(rand_block_stream[block_nchars], num_full_blocks) if final_block_nchars == 0: yield "".join(full_blocks) else: final_block = islice(rand_block_stream[final_block_nchars], 1) yield "".join(chain(full_blocks, final_block)) yielded += 1 def _rand_str_fixedchar_enc(n: Optional[int], encoding: FixcharBaseEncoding) -> Iterator[str]: # pylint: disable = too-many-locals if n is not None and n < 0: raise ValueError() min_chars = _options["min_chars"] max_chars = _options["max_chars"] rand = _rand # pre-compute valid string lengths for fixed-char base encoding alphabet = encoding.alphabet char_nbits = encoding.char_nbits valid_lengths = [l for l in range(min_chars, max_chars+1) if (l*char_nbits)%8 < char_nbits] # infinte random character stream rand_char_stream = rand_char(alphabet=encoding.alphabet) # main yielding loop yielded = 0 while n is None or yielded < n: # sample random length l = rand.choice(valid_lengths) # yield random str filling l characters with pad bits set to zero extra_nbits = (l*char_nbits)%8 if extra_nbits == 0: s = "".join(islice(rand_char_stream, l)) else: all_chars_but_last = "".join(islice(rand_char_stream, l-1)) last_char_idx = rand.randrange(0, 2**(char_nbits-extra_nbits))<<extra_nbits last_char = alphabet[last_char_idx] s = all_chars_but_last+last_char if encoding.padding == "ignore": yield s else: yield encoding.pad_string(s) yielded += 1