Source code for cascada.linear.characteristic

"""Manipulate non-symbolic linear characteristics.

.. autosummary::
   :nosignatures:

    Characteristic
    EncryptionCharacteristic
"""
import decimal
import collections
import functools
import itertools
import math
import multiprocessing
import random

from cascada.bitvector import core
from cascada.bitvector import context
from cascada.bitvector import ssa as cascada_ssa
from cascada.linear import chmodel as cascada_chmodel
from cascada.linear import opmodel as cascada_opmodel

from cascada import abstractproperty


zip = functools.partial(zip, strict=True)
EmpiricalWeightData = abstractproperty.characteristic.EmpiricalWeightData


[docs]class Characteristic(abstractproperty.characteristic.Characteristic): """Represent linear characteristics over bit-vector functions. Internally, this class is a subclass of `abstractproperty.characteristic.Characteristic`, where the `Property` is a `LinearMask` type. As mentioned in `abstractproperty.characteristic.Characteristic`, the characteristic probability is defined as the product of the propagation probability (absolute correlation) of the `LinearMask` pairs (linear approximations) :math:`(\Delta_{x_{i}} \mapsto \Delta_{x_{i+1}})` over :math:`f_i`. If :math:`f` has external variables, the characteristic probability approximates the absolute correlation of the input-output mask pair of the characteristic averaged over the set of all values of the external variables. >>> from cascada.bitvector.core import Constant >>> from cascada.bitvector.operation import RotateLeft, RotateRight >>> from cascada.linear.mask import LinearMask, LinearMask >>> from cascada.linear.chmodel import ChModel >>> from cascada.linear.characteristic import Characteristic >>> from cascada.primitives import speck >>> Speck32_KS = speck.get_Speck_instance(speck.SpeckInstance.speck_32_64).key_schedule >>> Speck32_KS.set_num_rounds(2) >>> ch_model = ChModel(Speck32_KS, LinearMask, ["mk0", "mk1", "mk2"]) >>> zm = core.Constant(0, width=16) >>> mk0 = RotateLeft(Constant(1, width=16), 8) # mk0 >>> 7 == 0b0···010 >>> mx5 = RotateRight(mk0, 7) # ~ mk0 >>> 7 == 0b0···010 >>> mx3__1 = RotateRight(Constant(1, width=16), 1) # mx3__1 <<< 2 == 0b0···010 >>> mx3_out = core.Constant(0x8002, 16) >>> assign_outmask_list = [zm, zm, zm, zm, mx5, mx3__1, mx5, mx5, zm, mx3_out, mx5] >>> ch = Characteristic([mk0, zm, zm], [zm, mx3_out, mx5], assign_outmask_list, ch_model) >>> ch # doctest: +NORMALIZE_WHITESPACE Characteristic(ch_weight=1, assignment_weights=[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0], input_mask=[0x0100, 0x0000, 0x0000], output_mask=[0x0000, 0x8002, 0x0002], assign_outmask_list=[0x0000, 0x0000, 0x0000, 0x0000, 0x0002, 0x8000, 0x0002, 0x0002, 0x0000, 0x8002, 0x0002]) >>> list(zip(ch.assignment_weights, ch.tuple_assign_outmask2op_model)) # doctest: +NORMALIZE_WHITESPACE [(Decimal('0'), (LinearMask(0x0000), LinearModelFreeBranch(LinearMask(0x0000)))), (Decimal('0'), (LinearMask(0x0000), LinearModelFreeBranch(LinearMask(0x0000)))), (Decimal('0'), (LinearMask(0x0000), LinearModelBvAdd([LinearMask(0x0000), LinearMask(0x0000)]))), (Decimal('0'), (LinearMask(0x0000), LinearModelBvXor([LinearMask(0x0000), LinearMask(0x0000)]))), (Decimal('0'), (LinearMask(0x0002), LinearModelFreeBranch(LinearMask(0x0000)))), (Decimal('0'), (LinearMask(0x8000), LinearModelFreeBranch(LinearMask(0x0000)))), (Decimal('1'), (LinearMask(0x0002), LinearModelBvAdd([LinearMask(0x0002), LinearMask(0x0002)]))), (Decimal('0'), (LinearMask(0x0002), LinearModelBvXor([LinearMask(0x0002), LinearMask(0x0002)]))), (Decimal('0'), (LinearMask(0x0000), LinearModelId(LinearMask(0x0000)))), (Decimal('0'), (LinearMask(0x8002), LinearModelId(LinearMask(0x8002)))), (Decimal('0'), (LinearMask(0x0002), LinearModelId(LinearMask(0x0002))))] Attributes: input_mask: a list of `LinearMask` objects containing the (constant) input mask (alias of `abstractproperty.characteristic.Characteristic.input_prop`). output_mask: a list of `LinearMask` objects containing the (constant) output mask (alias of `abstractproperty.characteristic.Characteristic.output_prop`). external_masks: a list containing the (constant) `LinearMask` of the external variables of the function (alias of `abstractproperty.characteristic.Characteristic.external_props`). tuple_assign_outmask2op_model: a tuple where each element is a pair containing: (1) the output (constant) `LinearMask` :math:`\Delta_{x_{i+1}}` of the non-trivial assignment :math:`x_{i+1} \leftarrow f_i(x_i)` and (2) the `linear.opmodel.OpModel` of this assignment with a (constant) input `LinearMask` :math:`\Delta_{x_{i}}` (alias of `abstractproperty.characteristic.Characteristic.tuple_assign_outprop2op_model`). free_masks: a list of (symbolic) `LinearMask` objects of the `Characteristic.ch_model`, whose values do not affect the characteristic, and were replaced by constant masks in `input_mask`, `output_mask`, `external_masks` or `tuple_assign_outmask2op_model` (alias of `abstractproperty.characteristic.Characteristic.free_props`). var_mask2ct_mask: a `collections.OrderedDict` mapping each symbolic `LinearMask` in the trail to its constant mask (alias of `abstractproperty.characteristic.Characteristic.var_prop2ct_prop`). """ _prop_label = "mask" # for str and vrepr @property def input_mask(self): return self.input_prop @property def output_mask(self): return self.output_prop @property def external_masks(self): return self.external_props @property def tuple_assign_outmask2op_model(self): return self.tuple_assign_outprop2op_model @property def free_masks(self): return self.free_props @property def var_mask2ct_mask(self): return self.var_prop2ct_prop def __init__(self, input_mask, output_mask, assign_outmask_list, ch_model, external_masks=None, free_masks=None, empirical_ch_weight=None, empirical_data_list=None, is_valid=True): super().__init__( input_prop=input_mask, output_prop=output_mask, assign_outprop_list=assign_outmask_list, ch_model=ch_model, external_props=external_masks, free_props=free_masks, empirical_ch_weight=empirical_ch_weight, empirical_data_list=empirical_data_list, is_valid=is_valid )
[docs] def vrepr(self, ignore_external_masks=False): """Return an executable string representation. See also `abstractproperty.characteristic.Characteristic.vrepr`. >>> from cascada.bitvector.core import Constant >>> from cascada.bitvector.operation import RotateLeft, RotateRight >>> from cascada.linear.mask import LinearMask >>> from cascada.linear.chmodel import ChModel >>> from cascada.primitives import speck >>> Speck32_KS = speck.get_Speck_instance(speck.SpeckInstance.speck_32_64).key_schedule >>> Speck32_KS.set_num_rounds(2) >>> ch_model = ChModel(Speck32_KS, LinearMask, ["mk0", "mk1", "mk2"]) >>> zm = core.Constant(0, width=16) >>> mk0 = RotateLeft(Constant(1, width=16), 8) # mk0 >>> 7 == 0b0···010 >>> mx5 = RotateRight(mk0, 7) # ~ mk0 >>> 7 == 0b0···010 >>> mx3__1 = RotateRight(Constant(1, width=16), 1) # mx3__1 <<< 2 == 0b0···010 >>> mx3_out = core.Constant(0x8002, 16) >>> assign_outmask_list = [zm, zm, zm, zm, mx5, mx3__1, mx5, mx5, zm, mx3_out, mx5] >>> ch = Characteristic([mk0, zm, zm], [zm, mx3_out, mx5], assign_outmask_list, ch_model) >>> ch.vrepr() # doctest: +NORMALIZE_WHITESPACE "Characteristic(input_mask=[Constant(0x0100, width=16), Constant(0x0000, width=16), Constant(0x0000, width=16)], output_mask=[Constant(0x0000, width=16), Constant(0x8002, width=16), Constant(0x0002, width=16)], assign_outmask_list=[Constant(0x0000, width=16), Constant(0x0000, width=16), Constant(0x0000, width=16), Constant(0x0000, width=16), Constant(0x0002, width=16), Constant(0x8000, width=16), Constant(0x0002, width=16), Constant(0x0002, width=16), Constant(0x0000, width=16), Constant(0x8002, width=16), Constant(0x0002, width=16)], ch_model=ChModel(func=SpeckKeySchedule.set_num_rounds_and_return(2), mask_type=LinearMask, input_mask_names=['mk0', 'mk1', 'mk2'], prefix='mx'))" >>> ch.srepr() 'Ch(w=1, id=0100 0000 0000, od=0000 8002 0002)' """ return super().vrepr(ignore_external_masks)
[docs] def split(self, mask_separators): """Split into multiple `Characteristic` objects given the list of mask separators. See also `abstractproperty.characteristic.Characteristic.split`. >>> from cascada.bitvector.core import Constant, Variable >>> from cascada.bitvector.operation import RotateLeft, RotateRight >>> from cascada.linear.mask import LinearMask >>> from cascada.linear.chmodel import ChModel >>> from cascada.linear.characteristic import Characteristic >>> from cascada.primitives import speck >>> Speck32_KS = speck.get_Speck_instance(speck.SpeckInstance.speck_32_64).key_schedule >>> Speck32_KS.set_num_rounds(2) >>> ch_model = ChModel(Speck32_KS, LinearMask, ["mk0", "mk1", "mk2"]) >>> tuple(ch_model.ssa.assignments.items()) # doctest: +NORMALIZE_WHITESPACE ((mx0, mk1 >>> 7), (mk2__0, Id(mk2)), (mk2__1, Id(mk2)), (mk2__2, Id(mk2)), (mx1, mx0 + mk2__0), (mx2, mk2__1 <<< 2), (mx3, mx2 ^ mx1), (mx4, mk0 >>> 7), (mx3__0, Id(mx3)), (mx3__1, Id(mx3)), (mx3__2, Id(mx3)), (mx5, mx4 + mx3__0), (mx6, mx5 ^ 0x0001), (mx7, mx3__1 <<< 2), (mx8, mx7 ^ mx6), (mk2_out, Id(mk2__2)), (mx3_out, Id(mx3__2)), (mx8_out, Id(mx8))) >>> mask_separators = [ (LinearMask(Variable("mx2", width=16)), LinearMask(Variable("mx3", width=16))), ] >>> zm = core.Constant(0, width=16) >>> mk0 = RotateLeft(Constant(1, width=16), 8) # mk0 >>> 7 == 0b0···010 >>> mx5 = RotateRight(mk0, 7) # ~ mk0 >>> 7 == 0b0···010 >>> mx3__1 = RotateRight(Constant(1, width=16), 1) # mx3__1 <<< 2 == 0b0···010 >>> mx3_out = core.Constant(0x8002, 16) >>> assign_outmask_list = [zm, zm, zm, zm, mx5, mx3__1, mx5, mx5, zm, mx3_out, mx5] >>> ch = Characteristic([mk0, zm, zm], [zm, mx3_out, mx5], assign_outmask_list, ch_model) >>> for ch in ch.split(mask_separators): print(ch) # doctest: +NORMALIZE_WHITESPACE Characteristic(ch_weight=0, assignment_weights=[0, 0, 0, 0, 0, 0, 0], input_mask=[0x0100, 0x0000, 0x0000], output_mask=[0x0100, 0x0000, 0x0000], assign_outmask_list=[0x0000, 0x0000, 0x0000, 0x0000, 0x0100, 0x0000, 0x0000]) Characteristic(ch_weight=1, assignment_weights=[0, 0, 1, 0, 0, 0, 0], input_mask=[0x0100, 0x0000, 0x0000], output_mask=[0x0000, 0x8002, 0x0002], assign_outmask_list=[0x0002, 0x8000, 0x0002, 0x0002, 0x0000, 0x8002, 0x0002]) """ return super().split(mask_separators)
@classmethod def _sample_outprop_opmodel(cls, mask_type, ct_op_model, outmask_width, get_random_bv): if outmask_width > 8: raise ValueError("random does not support characteristics with masks with more than 8 bits") if isinstance(ct_op_model, cascada_opmodel.LinearModelBvXor): if ct_op_model.input_mask[0] != ct_op_model.input_mask[1]: return None else: return ct_op_model.input_mask[0] else: return super()._sample_outprop_opmodel(mask_type, ct_op_model, outmask_width, get_random_bv) # elif isinstance(ct_op_model, cascada_opmodel.LinearModelBvAdd) and outmask_width >= 8: # # LinearModelBvAdd for more than 8-bit too slow # from cascada.smt.types import environment, bv2pysmt, pysmt2bv # var_prop = mask_type(core.Variable("output_mask", outmask_width)) # with context.Simplification(False), context.Validation(False): # # explicit=True to ensure only one variable in pysmt_model # bv_expr = ct_op_model.validity_constraint(var_prop, explicit=True) # env = environment.reset_env() # pysmt_expr = bv2pysmt(bv_expr, boolean=True, env=env) # pysmt_model = env.factory.get_model(pysmt_expr) # if pysmt_model is None: # return None # for _, pysmt_val in pysmt_model: # bv_val = pysmt2bv(pysmt_val) # break # return mask_type(bv_val)
[docs] @classmethod def random(cls, ch_model, seed, external_masks=None): """Return a random `Characteristic` with given `linear.chmodel.ChModel`. See also `abstractproperty.characteristic.Characteristic.random`. >>> from cascada.linear.mask import LinearMask >>> from cascada.linear.chmodel import ChModel >>> from cascada.linear.characteristic import Characteristic >>> from cascada.bitvector.ssa import BvFunction >>> class MyFoo(BvFunction): ... input_widths, output_widths = [4, 4], [4, 4] ... @classmethod ... def eval(cls, a, b): return a + b, a >>> ch_model = ChModel(MyFoo, LinearMask, ["ma", "mb"]) >>> Characteristic.random(ch_model, 0) # doctest: +NORMALIZE_WHITESPACE Characteristic(ch_weight=2, assignment_weights=[0, 2, 0, 0], input_mask=[0x2, 0x6], output_mask=[0x4, 0x5], assign_outmask_list=[0x7, 0x4, 0x4, 0x5]) """ return super().random(ch_model, seed, external_masks)
@staticmethod def _num_right_inputs2weight(num_right_inputs, num_input_samples): if num_right_inputs == num_input_samples / 2: return math.inf else: inner_pr = num_right_inputs / decimal.Decimal(num_input_samples) correlation = (2 * inner_pr - 1).copy_abs() weight = - abstractproperty.opmodel.log2_decimal(correlation) assert weight >= 0 return weight def _get_empirical_ch_weights_C(self, num_input_samples, num_external_samples, seed, num_parallel_processes, verbose=False, get_sampled_inputs=False): """Return a list of empirical weights (one for each ``num_external_samples``) by compiling and executing C code. See also `differential.characteristic.Characteristic._get_empirical_ch_weights_C`. """ import uuid from cascada.bitvector.printing import BvCCodePrinter assert not (verbose and get_sampled_inputs) assert 1 <= num_input_samples <= 2 ** sum(d.val.width for d in self.input_mask) if self.external_masks: assert 1 <= num_external_samples <= 2 ** sum(d.val.width for d in self.external_masks) else: assert num_external_samples == 0 if num_input_samples.bit_length() > 64: raise ValueError("max num_input_samples supported is 2**64 - 1") width2C_type = BvCCodePrinter._width2C_type get_and_mask_C_code = BvCCodePrinter._get_and_mask_C_code ctype_num_right_inputs = width2C_type(num_input_samples.bit_length()) # 1 - Build the C code header = f"{ctype_num_right_inputs} get_num_right_inputs(uint64_t seed);" if hasattr(self.ch_model, "_unwrapped_ch_model"): # avoid gcc error due to huge line statements ssa = self.ch_model._unwrapped_ch_model.ssa else: ssa = self.ch_model.ssa eval_ssa_code_function_name = "eval_ssa" _, eval_ssa_code_body = ssa.get_C_code(eval_ssa_code_function_name) # ignore header from pathlib import Path from cascada.differential import characteristic as diff_ch_module mt19937_filename = Path(diff_ch_module.__file__).parent.resolve() / "mt19937.c" def rand(my_width): if my_width == 1: rand_str = "genrand64_int1" elif my_width <= 8: rand_str = "genrand64_int8" elif my_width <= 16: rand_str = "genrand64_int16" elif my_width <= 32: rand_str = "genrand64_int32" elif my_width <= 64: rand_str = "genrand64_int64" else: raise ValueError("random bit-vectors with more than 64 bits are not supported") return f"{rand_str}(){get_and_mask_C_code(var.width)}" # stdint already in eval_ssa and in mt19937 body = f'#include "{mt19937_filename}"\n{eval_ssa_code_body}' body += f"\n{ctype_num_right_inputs} get_num_right_inputs(uint64_t seed){{" to_sample_all_iv = num_input_samples == 2 ** sum(d.val.width for d in self.input_mask) if self.external_masks or not to_sample_all_iv: body += "\n\tinit_genrand64(seed);" body += f"\n\t{ctype_num_right_inputs} num_right_inputs = 0U;" # to_sample_all_ev cannot be used since get_num_right_inputs only returns 1 EW if ssa.external_vars: for i in range(len(ssa.external_vars)): var = ssa.external_vars[i] body += f"\n\t{width2C_type(var.width)} {var.crepr()} = {rand(var.width)};" if verbose: body += f'\n\tprintf("\\nexternal_vars[%u] = %x", {i}U, {var.crepr()});' ev_args = ', '.join([v.crepr() for v in ssa.external_vars]) + ", " else: ev_args = "" # start for if not to_sample_all_iv: body += f"\n\tfor ({ctype_num_right_inputs} i = 0U; i < {num_input_samples}U; ++i) {{" for i in range(len(ssa.input_vars)): var = ssa.input_vars[i] if to_sample_all_iv: v = var.crepr() body += f"\n\t\tfor ({width2C_type(var.width+1)} {v} = 0U; {v} < {2**(var.width)}U; ++{v}) {{" else: body += f"\n\t\t{width2C_type(var.width)} {var.crepr()} = {rand(var.width)};" if verbose: body += f'\n\t\tprintf("\\ninput sample i=%u | input_vars[%u] = %x", i, {i}U, {var.crepr()});' if get_sampled_inputs: for i in range(len(ssa.input_vars)): aux_prefix = "[" if i == 0 else "" aux_suffix = "]," if i == len(ssa.input_vars) - 1 else "" body += f'\n\t\tprintf("{aux_prefix}0x%x,{aux_suffix}", {ssa.input_vars[i].crepr()});' iv_args = ', '.join([v.crepr() for v in ssa.input_vars]) for i in range(len(ssa.output_vars)): var = ssa.output_vars[i] # var passed by reference later (no need to declare them as pointers) body += f"\n\t\t{width2C_type(var.width)} {var.crepr()};" ov_args = ', '.join(["&" + v.crepr() for v in ssa.output_vars]) body += f"\n\t\t{eval_ssa_code_function_name}({iv_args}, {ev_args}{ov_args});" # parity computation def parity_func(my_width): # __builtin_parity(x) returns the parity of x, the number of 1-bits in x modulo 2. # __builtin_parity(a & b) returns <a, b> if my_width <= 16: return "__builtin_parity" elif my_width <= 32: return "__builtin_parityl" elif my_width <= 64: return "__builtin_parityll" else: raise ValueError("parity of bit-vectors with more than 64 bits is not supported") parity_inputs = [] for i in range(len(ssa.input_vars)): parity_inputs.append(ssa.input_vars[i] & self.input_mask[i].val) for i in range(len(ssa.output_vars)): parity_inputs.append(ssa.output_vars[i] & self.output_mask[i].val) parity_outputs = [] for parity_in in parity_inputs: if isinstance(parity_in, core.Constant): # i.e., mask = 0 assert parity_in == 0 else: width = parity_in.width parity_outputs.append(f"{parity_func(width)}( ({width2C_type(width)})({parity_in.crepr()}) )") if not parity_outputs: parity_outputs.append("0U") # sum_parity = <a,x> ^ <f(x), b> sum_parity_c = " ^ ".join(parity_outputs) # "1 ^ sum_parity" is equivalent to "if (sum_parity == 0) num_right_inputs++" body += f"\n\t\tnum_right_inputs += 1U ^ ({width2C_type(1)})({sum_parity_c});" # lhs = self.input_mask[0].apply(ssa.input_vars[0]) # for i in range(1, len(ssa.input_vars)): # lhs ^= self.input_mask[i].apply(ssa.input_vars[i]) # # rhs = self.output_mask[0].apply(ssa.output_vars[0]) # for i in range(1, len(ssa.output_vars)): # rhs ^= self.output_mask[i].apply(ssa.output_vars[i]) # # if verbose: # for i in range(len(ssa.output_vars)): # var = ssa.output_vars[i] # body += f'\n\t\tprintf("\\n | output_vars[%u] = %x", {i}U, {var.crepr()});' # body += f'\n\t\tprintf("\\n | lhs[%u] = %x", {i}U, {lhs.crepr()});' # body += f'\n\t\tprintf("\\n | rhs[%u] = %x", {i}U, {rhs.crepr()});' # # if_condition = f" ({width2C_type(lhs.width)})({lhs.crepr()}) == " \ # f" ({width2C_type(rhs.width)})({rhs.crepr()})" # body += f"\n\t\tif ( {if_condition} ){{" # body += "\n\t\t\tnum_right_inputs += 1U;" # body += "\n\t\t}" if to_sample_all_iv: body += "\n\t\t" + "}"*len(ssa.input_vars) else: body += "\n\t}" # end for if verbose: body += f'\n\tprintf("\\nnum_right_inputs = %u\\n", num_right_inputs);' body += "\n\treturn num_right_inputs;\n}" # 2 - Run the C code if num_parallel_processes is None or num_external_samples <= 1: pymod, tmpdir = cascada_ssa._compile_C_code(header, body, verbose=verbose) PRNG = random.Random() PRNG.seed(seed) aux_empirical_weights = [] for _ in range(max(1, num_external_samples)): # num_external_samples can be 0 num_right_inputs = pymod.lib.get_num_right_inputs(PRNG.randrange(0, 2**64)) aux_empirical_weights.append( self._num_right_inputs2weight(num_right_inputs, num_input_samples) ) else: lib_path, module_name, tmpdir = cascada_ssa._compile_C_code(header, body, return_unloaded=True, verbose=verbose) num_parallel_processes = min(num_parallel_processes, num_external_samples) PRNG = random.Random() PRNG.seed(seed) external_seeds = [PRNG.randrange(0, 2**64) for _ in range(num_external_samples)] chunk = num_external_samples // num_parallel_processes if num_external_samples % num_parallel_processes == 0: extra_chunk = 0 else: extra_chunk = num_external_samples % num_parallel_processes assert chunk*(num_parallel_processes-1) + chunk + extra_chunk == num_external_samples aux_empirical_weights = [] with multiprocessing.Pool(num_parallel_processes) as pool: async_results = [None for _ in range(num_parallel_processes)] for ar_index in range(len(async_results)): async_results[ar_index] = pool.apply_async( diff_ch_module._run_C_code_get_num_right_inputs, ( external_seeds[ar_index*chunk], module_name, lib_path, chunk+extra_chunk if ar_index == len(async_results) - 1 else chunk ) ) pool.close() pool.join() # blocking call for ar_index in range(len(async_results)): # type(process_list[process_index]) == AsyncResult # and AsyncResult.get() blocks until result obtained list_num_right_inputs = async_results[ar_index].get() aux_empirical_weights.extend( [self._num_right_inputs2weight(nri, num_input_samples) for nri in list_num_right_inputs] ) assert len(aux_empirical_weights) == num_external_samples tmpdir.cleanup() return aux_empirical_weights def _get_empirical_ch_weights_Python(self, num_input_samples, num_external_samples, seed, list_input_samples=None): """Return a list of empirical weights (one for each ``num_external_samples``).""" assert 1 <= num_input_samples <= 2 ** sum(d.val.width for d in self.input_mask) if self.external_masks: assert 1 <= num_external_samples <= 2 ** sum(d.val.width for d in self.external_masks) else: assert num_external_samples == 0 PRNG = random.Random() PRNG.seed(seed) def get_random_bv(width): return core.Constant(PRNG.randrange(2 ** width), width) aux_empirical_weights = [] with context.Simplification(False), context.Cache(False): input_widths = [d.val.width for d in self.input_mask] if list_input_samples is not None: assert len(list_input_samples) == num_input_samples def get_next_input(): for pt in list_input_samples: assert len(self.input_mask) == len(pt) yield [core.Constant(p_i, w) for p_i, w in zip(pt, input_widths)] elif num_input_samples == 2 ** sum(input_widths): # input_samples == whole input space def get_next_input(): for input_sample in itertools.product(*[range(2**w) for w in input_widths]): yield [core.Constant(p_i, w) for p_i, w in zip(input_sample, input_widths)] else: def get_next_input(): for _ in range(num_input_samples): yield [get_random_bv(mask.val.width) for mask in self.input_mask] found_external_vars = len(self.external_masks) > 0 if found_external_vars: external_widths = [d.val.width for d in self.external_masks] if num_external_samples == 2 ** sum(external_widths): # external_samples == whole external space external_space = itertools.product(*[range(2**w) for w in external_widths]) else: external_space = range(num_external_samples) # num_external_samples > 0 def get_next_ssa_external_fixed(): for external_sample in external_space: ssa = self.ch_model.ssa.copy() v2c = collections.OrderedDict() if isinstance(external_sample, int): for v in ssa.external_vars: v2c[v] = get_random_bv(v.width) else: for i_v, v in enumerate(ssa.external_vars): v2c[v] = core.Constant(external_sample[i_v], external_widths[i_v]) for outvar in ssa.assignments: ssa.assignments[outvar] = ssa.assignments[outvar].xreplace(v2c) ssa.external_vars = [] # print("# new external samples") # print("v2c:", v2c) # print("external_masks:", self.external_masks) # print("base ssa:", self.ch_model.ssa) # print("ssa:", ssa) yield ssa gen_next_ssa_external_fixed = get_next_ssa_external_fixed() for _ in range(max(1, num_external_samples)): # num_external_samples can be 0 num_right_inputs = 0 if found_external_vars: ssa = next(gen_next_ssa_external_fixed) # print("> ssa:", ssa) for pt in get_next_input(): if not found_external_vars: # func() faster than SSA.eval() try: ct = self.ch_model.func(*pt) except ValueError as e: if str(e).startswith("expected a tuple of Constant values returned"): # func might return redundant symbolic outputs (e.g., k ^ k = 0) ct = self.ch_model.ssa.eval(pt) else: raise e else: ct = ssa.eval(pt) if not all(isinstance(x, core.Constant) for x in ct): raise ValueError(f"found symbolic output in ct={ct}") # lhs = <a,x>, rhs= <f(x), b> lhs = self.input_mask[0].apply(pt[0]) for i in range(1, len(pt)): lhs ^= self.input_mask[i].apply(pt[i]) rhs = self.output_mask[0].apply(ct[0]) for i in range(1, len(ct)): rhs ^= self.output_mask[i].apply(ct[i]) if lhs == rhs: num_right_inputs += 1 # print("pt:", pt) # print("ct:", ct) # print("lhs:", lhs) # print("rhs:", rhs) # print() aux_empirical_weights.append( self._num_right_inputs2weight(num_right_inputs, num_input_samples) ) return aux_empirical_weights def _get_empirical_data_complexity(self, num_input_samples, num_external_samples): if num_input_samples is None: # data complexity = small_ct * c^(-2), c == ch. correlation # 2 w = 2 x -log2(c) = log2(c^(-2)) ==> 2**(2 w) = c^(-2) p = 2 ** (2 * (self.ch_weight + self.ch_model.error())) # small_ct == number of input bits small_ct = sum(d.val.width for d in self.ch_model.input_mask) num_input_samples = int(math.ceil(small_ct * p)) num_input_samples = max( 1, min(num_input_samples, 2**sum(d.val.width for d in self.ch_model.input_mask))) if not self.external_masks: num_external_samples = 0 else: if num_external_samples is None: # an external sample for each external bit num_external_samples = sum([d.width for d in self.ch_model.external_var2mask]) num_external_samples = max( 1, min(num_external_samples, 2 ** sum(d.width for d in self.ch_model.external_var2mask))) return num_input_samples, num_external_samples
[docs] def compute_empirical_ch_weight(self, num_input_samples=None, num_external_samples=None, split_by_max_weight=None, split_by_rounds=False, seed=None, C_code=False, num_parallel_processes=None): """Compute and store the empirical weight. The main description of this method can be read from `abstractproperty.characteristic.Characteristic.compute_empirical_ch_weight`, simply by replacing `Property` by `LinearMask` and input-output pair by hull. The basic subroutine in this case consists of computing the fraction of right inputs for ``num_input_samples`` sampled inputs. An input :math:`x` is a right input if :math:`\langle \\alpha, x \\rangle = \\langle \\beta, f(x) \\rangle`, where :math:`\\alpha` is `input_mask`, :math:`\\beta` is `output_mask` and :math:`f` is the underlying bit-vector function. >>> from cascada.bitvector.core import Constant >>> from cascada.bitvector.operation import RotateLeft, RotateRight >>> from cascada.linear.mask import LinearMask, LinearMask >>> from cascada.linear.chmodel import ChModel >>> from cascada.linear.characteristic import Characteristic >>> from cascada.primitives import speck >>> Speck32_KS = speck.get_Speck_instance(speck.SpeckInstance.speck_32_64).key_schedule >>> Speck32_KS.set_num_rounds(2) >>> ch_model = ChModel(Speck32_KS, LinearMask, ["mk0", "mk1", "mk2"]) >>> zm = core.Constant(0, width=16) >>> mk0 = RotateLeft(Constant(1, width=16), 8) # mk0 >>> 7 == 0b0···010 >>> mx5 = RotateRight(mk0, 7) # ~ mk0 >>> 7 == 0b0···010 >>> mx3__1 = RotateRight(Constant(1, width=16), 1) # mx3__1 <<< 2 == 0b0···010 >>> mx3_out = core.Constant(0x8002, 16) >>> assign_outmask_list = [zm, zm, zm, zm, mx5, mx3__1, mx5, mx5, zm, mx3_out, mx5] >>> ch = Characteristic([mk0, zm, zm], [zm, mx3_out, mx5], assign_outmask_list, ch_model) >>> ch.compute_empirical_ch_weight(seed=0) >>> ch.empirical_ch_weight Decimal('1.093109404391481470675941626') >>> for data in ch.empirical_data_list: print(data) # doctest: +NORMALIZE_WHITESPACE EmpiricalWeightData(weight_avg_aux_prs=1.093109404391481470675941626, num_aux_weights=1, num_inf_aux_weights=0, num_input_samples=192, seed=0, C_code=False) >>> ch.compute_empirical_ch_weight(seed=0, C_code=True) >>> ch.empirical_ch_weight Decimal('1.061400544664143309159590699') """ super().compute_empirical_ch_weight( num_input_samples=num_input_samples, num_external_samples=num_external_samples, split_by_max_weight=split_by_max_weight, split_by_rounds=split_by_rounds, seed=seed, C_code=C_code, num_parallel_processes=num_parallel_processes)
[docs]class EncryptionCharacteristic(abstractproperty.characteristic.EncryptionCharacteristic, Characteristic): """Represent linear characteristics over encryption functions. Given a `Cipher`, an `EncryptionCharacteristic` is a linear characteristic (see `Characteristic`) over the `Cipher.encryption` (where the `Cipher.key_schedule` is ignored and round key masks are given as constant `LinearMask`). The propagation probability of an `EncryptionCharacteristic` is similar to the expected linear probability (ELP), but instead of multiplying the absolute correlations, the ELP multiplies the square of the correlations (see https://eprint.iacr.org/2005/212). >>> from cascada.bitvector.core import Constant >>> from cascada.bitvector.operation import RotateRight, RotateLeft >>> from cascada.linear.mask import LinearMask >>> from cascada.linear.chmodel import EncryptionChModel >>> from cascada.linear.characteristic import EncryptionCharacteristic >>> from cascada.primitives import speck >>> Speck32 = speck.get_Speck_instance(speck.SpeckInstance.speck_32_64) >>> Speck32.set_num_rounds(2) >>> ch_model = EncryptionChModel(Speck32, LinearMask) >>> zm = core.Constant(0, width=16) >>> p1 = core.Constant(0x0600, width=16) >>> c0 = core.Constant(0x60f0, width=16) >>> c1 = core.Constant(0x60c0, width=16) >>> k1 = core.Constant(0x0030, width=16) >>> bv_1800 = core.Constant(0x1800, width=16) >>> assign_outmask_list = [zm, zm, zm, bv_1800, bv_1800, k1, k1, k1, c1, c1, c0, c1] >>> ch = EncryptionCharacteristic([zm, p1], [c0, c1], assign_outmask_list, ch_model, [zm, k1]) >>> ch # doctest: +NORMALIZE_WHITESPACE EncryptionCharacteristic(ch_weight=1, assignment_weights=[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0], input_mask=[0x0000, 0x0600], output_mask=[0x60f0, 0x60c0], external_masks=[0x0000, 0x0030], assign_outmask_list=[0x0000, 0x0000, 0x0000, 0x1800, 0x1800, 0x0030, 0x0030, 0x0030, 0x60c0, 0x60c0, 0x60f0, 0x60c0]) >>> list(zip(ch.assignment_weights, ch.tuple_assign_outmask2op_model)) # doctest: +NORMALIZE_WHITESPACE [(Decimal('0'), (LinearMask(0x0000), LinearModelFreeBranch(LinearMask(0x0600)))), (Decimal('0'), (LinearMask(0x0000), LinearModelBvAdd([LinearMask(0x0000), LinearMask(0x0000)]))), (Decimal('0'), (LinearMask(0x0000), LinearModelBvXor([LinearMask(0x0000), LinearMask(0x0000)]))), (Decimal('0'), (LinearMask(0x1800), LinearModelFreeBranch(LinearMask(0x0000)))), (Decimal('0'), (LinearMask(0x1800), LinearModelBvXor([LinearMask(0x1800), LinearMask(0x1800)]))), (Decimal('0'), (LinearMask(0x0030), LinearModelFreeBranch(LinearMask(0x1800)))), (Decimal('1'), (LinearMask(0x0030), LinearModelBvAdd([LinearMask(0x0030), LinearMask(0x0030)]))), (Decimal('0'), (LinearMask(0x0030), LinearModelBvXor([LinearMask(0x0030), LinearMask(0x0030)]))), (Decimal('0'), (LinearMask(0x60c0), LinearModelFreeBranch(LinearMask(0x0030)))), (Decimal('0'), (LinearMask(0x60c0), LinearModelBvXor([LinearMask(0x60c0), LinearMask(0x60c0)]))), (Decimal('0'), (LinearMask(0x60f0), LinearModelId(LinearMask(0x60f0)))), (Decimal('0'), (LinearMask(0x60c0), LinearModelId(LinearMask(0x60c0))))] """ def __init__(self, input_mask, output_mask, assign_outmask_list, ch_model, external_masks=None, free_masks=None, empirical_ch_weight=None, empirical_data_list=None, is_valid=True): assert isinstance(ch_model, cascada_chmodel.EncryptionChModel) if ch_model.ssa.external_vars: assert external_masks is not None super().__init__( input_mask, output_mask, assign_outmask_list, ch_model, external_props=external_masks, free_props=free_masks, empirical_ch_weight=empirical_ch_weight, empirical_data_list=empirical_data_list, is_valid=is_valid)