Source code for blocklib.pprllambdafold
import random
from collections import defaultdict
from typing import Dict, Sequence, Any, List, Optional, Union, cast
from .pprlindex import PPRLIndex, ReversedIndexResult
from .encoding import generate_bloom_filter
from .utils import deserialize_filters
from .stats import reversed_index_stats
from .validation import LambdaConfig
[docs]class PPRLIndexLambdaFold(PPRLIndex):
"""Class that implements the PPRL indexing technique:
An LSH-Based Blocking Approach with a Homomorphic Matching Technique for Privacy-Preserving Record Linkage.
This class includes an implementation of Lambda-fold redundant blocking method.
"""
def __init__(self, config: Union[LambdaConfig, Dict]):
"""Initialize the class and set the required parameters.
:param config: Configuration for P-Sig reverted index.
"""
if isinstance(config, dict):
config = LambdaConfig.parse_obj(config)
config = cast(LambdaConfig, config)
super().__init__(config)
self.blocking_features = config.blocking_features
# Lambda: number of redundant tables
self.mylambda = config.Lambda
self.bf_len = config.bloom_filter_length
self.num_hash_function = config.number_of_hash_functions
# K: number of base Hamming LSH hashing functions
self.K = config.K
self.input_clks = config.block_encodings
self.random_state = config.random_state
self.record_id_col = config.record_id_column
def __record_to_bf__(self, record: Sequence, blocking_features_index: List[int]):
"""Convert a record to list of bigrams and then map to a bloom filter."""
s = ''.join([record[i] for i in blocking_features_index])
# generate list of bigram of s. hash each bigram to position of bit 1 and flip bloom filter
ngram = 2
grams = [s[i: i + ngram] for i in range(len(s) - ngram + 1)]
bloom_filter = generate_bloom_filter(grams, self.bf_len, self.num_hash_function)
return bloom_filter
[docs] def build_reversed_index(self, data: Sequence[Any], header: Optional[List[str]] = None):
"""Build inverted index for PPRL Lambda-fold blocking method.
:param data: list of lists
:param header: file header, optional
:return: reversed index as ReversedIndexResult
"""
feature_to_index = self.get_feature_to_index_map(data, header)
self.set_blocking_features_index(self.blocking_features, feature_to_index)
# create record index lists
if self.record_id_col is None:
record_ids = list(range(len(data)))
else:
record_ids = [x[self.record_id_col] for x in data]
random.seed(self.random_state)
if self.input_clks:
clks = deserialize_filters(data)
else:
clks = [self.__record_to_bf__(rec, self.blocking_features_index) for rec in data]
bf_len = len(clks[0])
# build Lambda fold tables and add to the invert index
invert_index = {} # type: Dict[Any, List[Any]]
for i in range(self.mylambda):
lambda_table = defaultdict(list) # type: Dict[Any, Any]
# sample K indices from [0, bf-len]
indices = random.sample(range(bf_len), self.K)
for rec_id, clk in zip(record_ids, clks):
block_key = ''.join(['1' if clk[ind] else '0' for ind in indices])
lambda_table['{}{}'.format(i, block_key)].append(rec_id)
invert_index.update(lambda_table)
return ReversedIndexResult(invert_index, reversed_index_stats(invert_index))