""" Main public API of the package."""
from __future__ import annotations
import typing
import warnings
from collections import defaultdict
from typing import Any
from typing import Callable
from typing import Collection
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from iamsystem.fuzzy.abbreviations import Abbreviations
from iamsystem.fuzzy.api import FuzzyAlgo
from iamsystem.fuzzy.api import INormLabelAlgo
from iamsystem.fuzzy.api import ISynsProvider
from iamsystem.fuzzy.api import SynAlgos
from iamsystem.fuzzy.cache import CacheFuzzyAlgos
from iamsystem.fuzzy.exact import ExactMatch
from iamsystem.fuzzy.norm_fun import WordNormalizer
from iamsystem.fuzzy.regex import FuzzyRegex
from iamsystem.fuzzy.simstring import SimStringWrapper
from iamsystem.fuzzy.spellwise import SpellWiseWrapper
from iamsystem.fuzzy.util import SimpleWords2ignore
from iamsystem.keywords.api import IKeyword
from iamsystem.keywords.api import IStoreKeywords
from iamsystem.keywords.collection import Terminology
from iamsystem.keywords.keywords import Keyword
from iamsystem.keywords.util import get_unigrams
from iamsystem.matcher.annotation import Annotation
from iamsystem.matcher.annotation import create_annot
from iamsystem.matcher.annotation import rm_nested_annots
from iamsystem.matcher.annotation import sort_annot
from iamsystem.matcher.api import IMatcher
from iamsystem.matcher.util import IState
from iamsystem.matcher.util import StartState
from iamsystem.matcher.util import TransitionState
from iamsystem.stopwords.api import ISimpleStopwords
from iamsystem.stopwords.api import IStopwords
from iamsystem.stopwords.api import IStoreStopwords
from iamsystem.stopwords.negative import NegativeStopwords
from iamsystem.stopwords.simple import NoStopwords
from iamsystem.stopwords.simple import Stopwords
from iamsystem.tokenization.api import ITokenizer
from iamsystem.tokenization.api import TokenT
from iamsystem.tokenization.tokenize import french_tokenizer
from iamsystem.tokenization.tokenize import tokenize_and_order_decorator
from iamsystem.tree.nodes import EMPTY_NODE
from iamsystem.tree.nodes import INode
from iamsystem.tree.trie import Trie
filter_fun = Callable[[Annotation[TokenT]], Annotation[TokenT]]
[docs]class Matcher(IMatcher[TokenT]):
"""Main public API to perform semantic annotation (aka entity linking)
with iamsystem algorithm."""
[docs] def __init__(
self,
tokenizer: ITokenizer = french_tokenizer(),
stopwords: IStopwords[TokenT] = None,
):
"""Create an IAMsystem matcher to annotate documents.
Prefer :py:meth:`~iamsystem.Matcher.build` method to create a matcher.
:param tokenizer: default :func:`~iamsystem.french_tokenizer`.
A :class:`~iamsystem.ITokenizer` instance responsible for
tokenizing and normalizing.
:param stopwords: a :class:`~iamsystem.IStopwords` to ignore empty
words in keywords and documents.
If None, default to :class:`~iamsystem.Stopwords`.
"""
self._w = 1
self._tokenizer = tokenizer
self._fuzzy_algos: List[FuzzyAlgo[TokenT]] = [ExactMatch()]
self._trie: Trie = Trie()
self._termino: IStoreKeywords = Terminology()
self._remove_nested_annots = True
if stopwords is not None:
self._stopwords = stopwords
else:
self._stopwords = Stopwords()
@property
def stopwords(self) -> IStopwords[TokenT]:
"""Return the :class:`~iamsystem.IStopwords` used by the matcher."""
return self._stopwords
@stopwords.setter
def stopwords(self, stopwords: IStopwords[TokenT]) -> None:
"""Set the stopwords.
Note that keywords already added will not be modified.
:param stopwords: a :class:`~iamsystem.IStopwords` to ignore empty
words in keywords and documents.
:return: None
"""
self._stopwords = stopwords
@property
def tokenizer(self) -> ITokenizer[TokenT]:
"""Return the :class:`~iamsystem.ITokenizer` used by the matcher."""
return self._tokenizer
@tokenizer.setter
def tokenizer(self, tokenizer: ITokenizer[TokenT]) -> None:
"""Change the tokenizer.
Note that keywords already added will not be modified.
:param tokenizer: A :class:`~iamsystem.ITokenizer` instance
responsible for tokenizing and normalizing.
:return: None
"""
self._tokenizer = tokenizer
@property
def remove_nested_annots(self) -> bool:
"""Whether to remove nested annotations. Default to True."""
return self._remove_nested_annots
@remove_nested_annots.setter
def remove_nested_annots(self, remove_nested_annots: bool) -> None:
"""Set remove_nested_annots value. Default to True.
:param remove_nested_annots: if two annotations overlap,
remove the shorter one. Default to True since
longest annotations are often more specific than shorter ones.
:return: None
"""
self._remove_nested_annots = remove_nested_annots
@property
def w(self) -> int:
"""Return the window parameter of this matcher."""
return self._w
@w.setter
def w(self, value: int) -> None:
"""Set the window parameter. Default to 1.
:param value: How much discontinuous keyword's tokens
to find can be. By default, w=1 means the sequence must be
continuous. w=2 means each token can be separated by another token.
:return: None
"""
self._w = value
@property
def fuzzy_algos(self) -> Iterable[FuzzyAlgo[TokenT]]:
"""The fuzzy algorithms used by the algorithm.
:return: :class:`~iamsystem.FuzzyAlgo` instances responsible for
finding possible synonyms for each token of a document.
"""
return self._fuzzy_algos
[docs] def is_token_a_stopword(self, token: TokenT) -> bool:
"""Check if a token is a stopword.
:param token: a generic token that implements
:class:`~iamsystem.IToken`.
:return: True if the token is a stopword.
"""
return self._stopwords.is_token_a_stopword(token=token)
[docs] def is_stopword(self, word: str) -> bool:
"""Return True if word is a stopword."""
if isinstance(self._stopwords, ISimpleStopwords):
return self._stopwords.is_stopword(word=word)
else:
warnings.warn(
f"{self._stopwords.__class__.__name__} "
f"does not implement this method."
)
return False
[docs] def tokenize(self, text: str) -> Sequence[TokenT]:
"""Tokenize a text with the tokenizer's instance.
:param text: a document or a keyword.
:return: A sequence of tokens, the type depends on the tokenizer but
must implement :class:`~iamsystem.IToken` protocol.
"""
return self._tokenizer.tokenize(text=text)
[docs] def add_keywords(
self, keywords: Iterable[Union[str, IKeyword, Dict[Any]]]
) -> None:
"""Utility function to add multiple keywords.
:param keywords: an iterable of string (labels) or
:class:`~iamsystem.IKeyword` to search in a document.
:return: None.
"""
for kw in keywords:
if isinstance(kw, str):
kw = Keyword(label=kw)
self.add_keyword(keyword=kw)
[docs] def add_keyword(self, keyword: IKeyword) -> None:
"""Add a keyword to find in a document.
:param keyword: :class:`~iamsystem.IKeyword` to search in a document.
:return: None.
"""
self._termino.add_keyword(keyword=keyword)
self._trie.add_keyword(
keyword=keyword,
tokenizer=self,
stopwords=self,
)
@property
def keywords(self) -> Collection[IKeyword]:
"""Return the keywords added."""
return self._termino.keywords
[docs] def get_keywords_unigrams(self) -> Set[str]:
"""Get all the unigrams (single words excluding stopwords)
in the keywords."""
return get_unigrams(
keywords=self.keywords,
tokenizer=self,
stopwords=self,
)
[docs] def add_stopwords(self, words: Iterable[str]) -> None:
"""Add words (tokens) to be ignored in :class:`~iamsystem.IKeyword`
and in documents.
:param words: a list of words to ignore.
:return: None.
"""
if isinstance(self._stopwords, IStoreStopwords):
self._stopwords.add(words=words)
else:
warnings.warn(
f"Adding stopwords have no effect on class "
f"{self._stopwords.__class__.__name__}"
)
[docs] def add_fuzzy_algo(self, fuzzy_algo: FuzzyAlgo[TokenT]) -> None:
"""Add a fuzzy algorithms to provide synonym(s) that helps matching
a token of a document and a token of a keyword.
:param fuzzy_algo: a :class:`~iamsystem.FuzzyAlgo` instance.
:return: None.
"""
self._fuzzy_algos.append(fuzzy_algo)
[docs] def get_initial_state(self) -> INode:
"""Return the initial state from which iamsystem algorithm will start
searching for a sequence of keywords'tokens."""
return self._trie.get_initial_state()
[docs] def get_synonyms(
self, tokens: Sequence[TokenT], i: int, w_states: List[List[IState]]
) -> Iterable[SynAlgos]:
"""Get synonyms of a token with configured fuzzy algorithms.
:param tokens: document's tokens.
:param i: the ith token for which synonyms are expected.
:param w_states: algorithm's states.
:return: tuples of synonyms and fuzzy algorithm's names.
"""
syns_collector = defaultdict(list)
for algo in self.fuzzy_algos:
for syn, algo_name in algo.get_synonyms(
tokens=tokens, i=i, w_states=w_states
):
syns_collector[syn].append(algo_name)
synonyms: List[SynAlgos] = list(syns_collector.items())
return synonyms
[docs] def annot_text(self, text: str) -> List[Annotation[TokenT]]:
"""Annotate a document.
:param text: the document to annotate.
:return: a list of :class:`~iamsystem.Annotation`.
"""
tokens: Sequence[TokenT] = self.tokenize(text)
return self.annot_tokens(tokens=tokens)
[docs] def annot_tokens(
self, tokens: Sequence[TokenT]
) -> List[Annotation[TokenT]]:
"""Annotate a sequence of tokens.
:param tokens: an ordered or unordered sequence of tokens.
:return: a list of :class:`~iamsystem.Annotation`.
"""
annots = detect(
tokens=tokens,
w=self.w,
initial_state=self.get_initial_state(),
syns_provider=self,
stopwords=self,
)
if self._remove_nested_annots:
annots = rm_nested_annots(annots=annots, keep_ancestors=False)
return annots
[docs] @classmethod
def build(
cls,
keywords: Iterable[Union[str, IKeyword]],
tokenizer: ITokenizer = None,
stopwords: Union[IStopwords[TokenT], Iterable[str]] = NoStopwords(),
w=1,
order_tokens=False,
negative=False,
remove_nested_annots=True,
string_distance_ignored_w: Optional[Iterable[str]] = None,
abbreviations: Optional[Iterable[Tuple[str, str]]] = None,
spellwise: Optional[List[Dict[Any]]] = None,
simstring: Optional[List[Dict[Any]]] = None,
normalizers: Optional[List[Dict[Any]]] = None,
fuzzy_regex: Optional[List[Dict[Any]]] = None,
) -> Matcher[TokenT]:
"""
Create an IAMsystem matcher to annotate documents.
:param keywords: an iterable of keywords string or
:class:`~iamsystem.IKeyword` instances.
:param tokenizer: default :func:`~iamsystem.french_tokenizer`.
A :class:`~iamsystem.ITokenizer` instance responsible for
tokenizing and normalizing.
:param stopwords: provide a :class:`~iamsystem.IStopwords`.
If None, default to :class:`~iamsystem.Stopwords`.
:param w: Window. How much discontinuous keyword's tokens
to find can be. By default, w=1 means the sequence must be
continuous. w=2 means each token can be separated by another token.
:param order_tokens: order tokens alphabetically if order doesn't
matter in the matching strategy.
:param negative: every unigram not in the keywords is a stopword.
Default to False. If stopwords are also passed, they will be
removed in the unigrams and so still be stopwords.
:param remove_nested_annots: if two annotations overlap,
remove the shorter one. Default to True
:param string_distance_ignored_w: words ignored by string distance
algorithms to avoid false positives matched.
:param abbreviations: an iterable of tuples (short_form, long_form).
:param spellwise: an iterable of :class:`~iamsystem.SpellWiseWrapper`
init parameters. if 'string_distance_ignored_w' is set, these words
parameter will be passed.
:param simstring: an iterable of :class:`~iamsystem.SimStringWrapper`
init parameters. if 'string_distance_ignored_w' is set, these words
parameter will be passed.
:param normalizers: an iterable of :class:`~iamsystem.WordNormalizer`
init parameters.
:param fuzzy_regex: an iterable of :class:`~iamsystem.FuzzyRegex`
init parameters.
"""
# Tokenizer configuration
if tokenizer is None:
tokenizer = french_tokenizer()
if order_tokens:
tokenizer.tokenize = tokenize_and_order_decorator(
tokenizer.tokenize
)
# Start building and configuring the matcher
matcher = Matcher(tokenizer=tokenizer)
# Decorate tokenize function to order alphabetically
matcher.order_tokens = order_tokens
# Configure stopwords
if isinstance(stopwords, Iterable):
matcher.add_stopwords(words=stopwords)
elif isinstance(stopwords, IStopwords):
matcher.stopwords = stopwords
# Configure annot_text function
matcher.w = w
matcher.remove_nested_annots = remove_nested_annots
# Add the keywords
matcher.add_keywords(keywords=keywords)
# add negative stopwords after stopwords and keywords are added
# since this class needs keywords'unigrams without stopwords.
if negative:
matcher.stopwords = NegativeStopwords(
words_to_keep=matcher.get_keywords_unigrams()
)
# fuzzy algorithms parameterization
def _add_algo_in_cache_closure(
cache: CacheFuzzyAlgos, matcher: Matcher
):
"""Internal build function to add cache_fuzzy algorithm to the
list of fuzzy algorithms the first time an algorithm is added in
cache."""
def add_algo_in_cache(algo=INormLabelAlgo):
"""Add an algorithm in cache."""
if cache not in matcher.fuzzy_algos:
matcher.add_fuzzy_algo(fuzzy_algo=cache)
cache.add_algo(algo=algo)
return add_algo_in_cache
cache = CacheFuzzyAlgos()
add_algo_in_cache = _add_algo_in_cache_closure(
cache=cache, matcher=matcher
)
# Abbreviations
if abbreviations is not None:
_abbreviations = Abbreviations(name="abbs")
matcher.add_fuzzy_algo(fuzzy_algo=_abbreviations)
for abb in abbreviations:
short_form, long_form = abb
_abbreviations.add(
short_form=short_form,
long_form=long_form,
tokenizer=matcher.tokenizer,
)
# WordNormalizer
if normalizers is not None:
for params in normalizers:
word_normalizer = WordNormalizer(**params)
word_normalizer.add_words(
words=matcher.get_keywords_unigrams()
)
add_algo_in_cache(algo=word_normalizer)
# FuzzyRegex
if fuzzy_regex is not None:
for params in fuzzy_regex:
fuzzy = FuzzyRegex(**params)
add_algo_in_cache(algo=fuzzy)
if negative:
negative_stopwords = typing.cast(
NegativeStopwords,
matcher.stopwords,
)
negative_stopwords.add_fun_is_a_word_to_keep(
fuzzy.token_matches_pattern
)
# String Distances
# words ignored by string distance algorithms
words2ignore = None
if string_distance_ignored_w is not None:
words2ignore = SimpleWords2ignore(words=string_distance_ignored_w)
# Parameterize spellwise
if spellwise is not None:
for params in spellwise:
# don't override user's 'words2ignore':
if "words2ignore" not in params:
params["words2ignore"] = words2ignore
spellwise = SpellWiseWrapper(**params)
spellwise.add_words(words=matcher.get_keywords_unigrams())
add_algo_in_cache(algo=spellwise)
# Parameterize simstring
if simstring is not None:
for params in simstring:
# don't override user's 'words2ignore':
if "words2ignore" not in params:
params["words2ignore"] = words2ignore
ss_algo = SimStringWrapper(
words=matcher.get_keywords_unigrams(),
**params,
)
add_algo_in_cache(algo=ss_algo)
return matcher
def detect(
tokens: Sequence[TokenT],
w: int,
initial_state: INode,
syns_provider: ISynsProvider,
stopwords: IStopwords,
) -> List[Annotation[TokenT]]:
"""Main internal function that implements iamsystem's algorithm.
Algorithm formalized in https://ceur-ws.org/Vol-3202/livingner-paper11.pdf
:param tokens: a sequence of :class:`~iamsystem.IToken`.
:param w: window, how many previous tokens can the algorithm look at.
:param initial_state: a node/state in the trie, i.e. the root node.
:param syns_provider: a class that provides synonyms for each token.
:param stopwords: an instance of :class:`~iamsystem.IStopwords`
that checks if a token is a stopword.
:return: A list of :class:`~iamsystem.Annotation`.
"""
annots: List[Annotation] = []
# +1 to insert the start_state.
w_states: List[List[IState]] = [[]] * (w + 1)
start_state = StartState(node=initial_state)
# [w] element stores only the start_state. This element is not replaced.
w_states[w] = [start_state]
# different from i for a stopword-independent window size.
count_not_stopword = 0
for i, token in enumerate(tokens):
if stopwords.is_token_a_stopword(token):
continue
count_not_stopword += 1
syns_algos: Iterable[SynAlgos] = syns_provider.get_synonyms(
tokens=tokens, i=i, w_states=w_states
)
# stores matches between document's tokens and keywords'tokens.
tokens_states: List[TransitionState] = []
# 1 to many synonyms depending on fuzzy_algos configuration.
for syn, algos in syns_algos:
# 0 to many states for [0] to [w-1] ; [w] only the start state.
for states in w_states:
for state in states:
new_state = state.node.jump_to_node(syn)
# when no path is found, EMPTY_NODE is returned.
if new_state is EMPTY_NODE:
continue
token_state = TransitionState(
parent=state, node=new_state, token=token, algos=algos
)
tokens_states.append(token_state)
if new_state.is_a_final_state():
annot = create_annot(last_el=token_state)
annots.append(annot)
# function 'count_not_stopword % w' has range [0 ; w-1]
w_states[count_not_stopword % w].clear()
w_states[count_not_stopword % w] = tokens_states
# Mypy: Incompatible types in assignment (expression has type
# "List[TokenState[Any]]", target has type "List[State]")
# but TokenState is a sublcass of State.
sort_annot(annots) # mutate the list like annots.sort()
return annots