Source code for blocklib.pprlpsig
import logging
from collections import defaultdict
from typing import Dict, List, Sequence, Any, Optional, Union, cast
from .encoding import flip_bloom_filter
from .pprlindex import PPRLIndex, ReversedIndexResult
from .signature_generator import generate_signatures
from .stats import reversed_index_per_strategy_stats, reversed_index_stats
from .validation import PSigConfig
[docs]class PPRLIndexPSignature(PPRLIndex):
"""Class that implements the PPRL indexing technique:
Reference scalability entity resolution using probability signatures
on parallel databases.
This class includes an implementation of p-sig algorithm.
"""
def __init__(self, config: Union[PSigConfig, Dict]) -> None:
"""Initialize the class and set the required parameters.
Arguments:
- config: Configuration for P-Sig reverted index.
"""
if isinstance(config, dict):
config = PSigConfig.parse_obj(config)
config = cast(PSigConfig, config)
super().__init__(config)
self.blocking_features = config.blocking_features
self.filter_config = config.filter
self.blocking_config = config.blocking_filter
self.signature_strategies = config.signatures
self.rec_id_col = config.record_id_column
[docs] def build_reversed_index(self, data: Sequence[Sequence], header: Optional[List[str]] = None):
"""Build inverted index given P-Sig method.
"""
feature_to_index = self.get_feature_to_index_map(data, header)
self.set_blocking_features_index(self.blocking_features, feature_to_index)
# Build index of records
if self.rec_id_col is None:
record_ids = list(range(len(data)))
else:
record_ids = [x[self.rec_id_col] for x in data]
reversed_index_per_strategy = \
[defaultdict(list) for _ in range(len(self.signature_strategies))] # type: List[Dict[str, List[Any]]]
# Build inverted index
# {signature -> record ids}
for rec_id, dtuple in zip(record_ids, data):
signatures = generate_signatures(self.signature_strategies, dtuple, feature_to_index)
for i, signature in enumerate(signatures):
reversed_index_per_strategy[i][signature].append(rec_id)
reversed_index_per_strategy = [self.filter_reversed_index(data, reversed_index) for reversed_index in
reversed_index_per_strategy]
# somehow the reversed_index of the first strategy gets overwritten in the next step. Thus, we generate the
# statistics of the different strategies first.
strategy_stats = reversed_index_per_strategy_stats(reversed_index_per_strategy, len(data))
# combine the reversed indices into one
filtered_reversed_index = reversed_index_per_strategy[0]
for rev_idx in reversed_index_per_strategy[1:]:
filtered_reversed_index.update(rev_idx)
# check if final inverted index is empty
if len(filtered_reversed_index) == 0:
raise ValueError('P-Sig: All records are filtered out!')
# compute coverage information
entities = set()
for recids in filtered_reversed_index.values():
for rid in recids:
entities.add(rid)
coverage = len(entities) / len(record_ids)
if coverage < 1:
logging.warning(
f'The P-Sig configuration leads to incomplete coverage ({round(coverage * 100, 2)}%)!\n'
f'This means that not all records are part of at least one block. You can increase coverage by '
f'adjusting the filter to be less aggressive or by finding signatures that produce smaller block sizes.'
)
# map signatures in reversed_index into bloom filter
num_hash_func = self.blocking_config.number_of_hash_functions
bf_len = self.blocking_config.bloom_filter_length
reversed_index = {} # type: Dict[str, List[Any]]
for signature, rec_ids in filtered_reversed_index.items():
bf_set = str(tuple(flip_bloom_filter(signature, bf_len, num_hash_func)))
if bf_set in reversed_index:
reversed_index[bf_set].extend(rec_ids)
else:
reversed_index[bf_set] = rec_ids
# create some statistics around the blocking results
stats = reversed_index_stats(reversed_index)
stats['statistics_per_strategy'] = strategy_stats
stats['coverage'] = coverage
return ReversedIndexResult(reversed_index, stats)
def filter_reversed_index(self, data: Sequence[Sequence], reversed_index: Dict):
# Filter inverted index based on ratio
n = len(data)
# filter blocks based on filter type
filter_type = self.filter_config.type
if filter_type == "ratio":
min_occur_ratio = self.filter_config.min
max_occur_ratio = self.filter_config.max
reversed_index = {k: v for k, v in reversed_index.items() if n * max_occur_ratio > len(v) > n * min_occur_ratio}
elif filter_type == "count":
min_occur_count = self.filter_config.min
max_occur_count = self.filter_config.max
reversed_index = {k: v for k, v in reversed_index.items() if max_occur_count > len(v) > min_occur_count}
else:
raise NotImplementedError("Don't support {} filter yet.".format(filter_type))
return reversed_index