from __future__ import annotations
import datetime
import logging
import pprint
import traceback
from collections import Counter, OrderedDict, defaultdict
from dataclasses import dataclass, replace, field
from typing import Callable, DefaultDict, Dict, List, TYPE_CHECKING
import progressbar
from ._util import letterRE
if TYPE_CHECKING:
from .dictionary import Dictionary
from .tokens import Token
from .tokens.list import TokenList
_heuristics_map = {
'a': 'annotator',
'o': 'original',
'k': 'kbest',
'd': 'kdict',
}
[docs]class Heuristics(object):
log = logging.getLogger(f'{__name__}.Heuristics')
[docs] @classmethod
def bin(cls, n: int) -> Bin:
return _bins[n]._copy()
def __init__(self, settings: Dict[int, str], dictionary):
"""
:param settings: A dictionary of ``bin number`` => ``heuristic`` settings.
:param dictionary: A dictionary for determining correctness of :class:`Tokens<CorrectOCR.tokens.Token>` and suggestions.
"""
for (_bin, code) in settings.items():
if code not in _heuristics_map.values():
Heuristics.log.warning(f'Unknown heuristic for bin {_bin}! Must be one of {_heuristics_map.values()}')
code = _heuristics_map[code] # attempt to get valid heuristic
_bins[int(_bin)].heuristic = code
for (number, _bin) in _bins.items():
_bin.number = number
Heuristics.log.debug(f'Bins: {_bins}')
self.dictionary = dictionary
self.documentCount = 0
self.tokenCount = 0
self.totalCount = 0
self.punctuationCount = 0
self.hyphenatedCount = 0
self.malformedTokens = []
self.nogoldCount = 0
self.oversegmented = 0
self.undersegmented = 0
self.summary = Counter()
[docs] def bin_for_word(self, original, kbest):
# k best candidates which are in dictionary
filtids = [n for n, item in kbest.items() if item.candidate in self.dictionary]
dcode = None
if len(filtids) == 0:
dcode = 'zerokd'
elif 0 < len(filtids) < len(kbest):
dcode = 'somekd'
elif len(filtids) == len(kbest):
dcode = 'allkd'
token_bin = None
for num, _bin in _bins.items():
if _bin.matcher(original, kbest[1].candidate, self.dictionary, dcode):
token_bin = _bin._copy()
break
if token_bin is None:
raise ValueError(f'No bin matched for: {token}')
if token_bin.heuristic == 'original':
selection = original
elif token_bin.heuristic == 'kbest':
selection = 1
elif token_bin.heuristic == 'kdict':
selection = filtids[0]
elif token_bin.heuristic == 'annotator':
selection = filtids
else:
raise ValueError(f'Bin {token_bin} has an unknown heuristic: {token_bin.heuristic}')
return token_bin.heuristic, selection, token_bin
[docs] def bin_tokens(self, tokens: TokenList, force = False) -> bool:
Heuristics.log.info('Running heuristics on tokens to determine annotator workload.')
modified_count = 0
counts = Counter()
annotatorRequired = 0
ts = iter(tokens)
for original, gold, token in progressbar.progressbar(tokens.consolidated, max_value=len(tokens)):
#Heuristics.log.debug(f'binning {token}')
if force or token.bin is None:
token.heuristic, token.selection, token.bin = self.bin_for_word(token.original, token.kbest)
if token.is_hyphenated:
# ugly...
next_token = tokens[token.index+1]
next_token.heuristic = token.heuristic
next_token.selection = token.selection
next_token.bin = token.bin
modified_count += 1
if token.heuristic is None or token.bin is None or token.selection is None:
raise ValueError(f'Token {token} was not binned!')
if token.bin == -1:
raise ValueError(f'Token {token} was not binned!')
if token.bin.number == -1:
raise ValueError(f'Token {token} was not binned!')
counts[token.bin.number] += 1
if token.heuristic == 'annotator':
annotatorRequired += 1
Heuristics.log.debug(f'Counts for each bin: {counts}')
Heuristics.log.info(f'Set bin for {modified_count} tokens. Annotator is required for {annotatorRequired} of {len(tokens)} tokens.')
return modified_count > 0
[docs] def add_to_report(self, tokens, rebin=False, hmm=None):
self.documentCount += 1
if rebin:
Heuristics.log.info(f'Will rebin {len(tokens)} tokens for comparison.')
for original, gold, token in progressbar.progressbar(tokens.consolidated, max_value=len(tokens)):
try:
self.totalCount += 1
if token.is_hyphenated:
self.hyphenatedCount += 1
if token.is_punctuation():
self.punctuationCount += 1
continue
# if the token or gold column is empty, a word segmentation error probably occurred in the original
# (though possibly a deletion)
# don't count any other errors here; they will be counted in the segmentation error's other half.
if original == '' and len(gold) > 0:
self.undersegmented += 1 # words ran together in original / undersegmentation
continue
if gold == '' and len(original) > 0:
self.oversegmented += 1 # word wrongly broken apart in original / oversegmentation
continue
if gold is None or len(gold) == 0:
self.nogoldCount += 1
# total number of real tokens - controlled for segmentation errors
self.tokenCount += 1
# an evidently useful quantity for sorting out what to send to annotators
# - can split any existing category across a threshold of this quantity
# (based on probabilities of best and 2nd-best candidates)
# qqh = (token.kbest[1].probablity-token.kbest[2].probability) / token.kbest[1].probability
if rebin:
kbest = hmm.kbest_for_word(token.original, token.k)
heuristic, selection, token_bin = self.bin_for_word(token.original, kbest)
bin_number = token_bin.number
else:
kbest = token.kbest
bin_number = token.bin.number
if _bins[bin_number].example is None and len(original) > 3 and letterRE.search(original):
_bins[bin_number].example = (original, gold, kbest)
counts = _bins[bin_number].counts
counts['total'] += 1
counts['previous'] = counts.get('previous', defaultdict(int))
if token.bin and bin_number != token.bin.number:
counts['previous'][f'bin {token.bin.number}'] += 1
counts['previous'][f'total'] += 1
if original == gold:
counts['(A) gold == orig'] += 1
if kbest[1].candidate == gold:
counts['(B) gold == k1'] += 1
# lower k best candidate words that pass the dictionary check
kbest_filtered = [item.candidate for (k, item) in kbest.items() if item.candidate in self.dictionary and k > 1]
if gold in kbest_filtered:
counts['(C) gold == lower kbest'] += 1
if token.heuristic:
counts[f'(D) heuristic was {token.heuristic}'] += 1
if token.heuristic == 'annotator':
if gold == original:
counts[f'(E) Annotator accepted the original'] += 1
elif gold == kbest[1].candidate:
counts[f'(E) Annotator chose the top candidate'] += 1
elif any([gold == item.candidate for item in kbest.values()]):
counts[f'(E) Annotator chose a lower candidate'] += 1
else:
counts[f'(E) Annotator made a novel correction'] += 1
except Exception as e:
Heuristics.log.error(f'Malformed token: {token}:\n{traceback.format_exc()}')
self.malformedTokens.append(token)
continue
[docs] def report(self) -> str:
if self.totalCount == 0:
raise ValueError(f'Cannot generate report: No tokens were added!')
Heuristics.log.debug(f'{[(i, b.counts) for i,b in _bins.items()]}')
out = f'CorrectOCR Report for {datetime.datetime.now().isoformat()}\n\n'
out += f'Total documents included in evaluation: {self.documentCount:10d} '.rjust(60) + '\n\n'
out += f'Total tokens included in evaluation: {self.totalCount:10d} '.rjust(60) + '\n\n'
out += f'Tokens without gold correction: {self.nogoldCount:10d} ({self.nogoldCount/self.totalCount:6.2%})'.rjust(60) + '\n\n'
out += f'Oversegmented: {self.oversegmented:10d} ({self.oversegmented/self.totalCount:6.2%})'.rjust(60) + '\n'
out += f'Undersegmented: {self.undersegmented:10d} ({self.undersegmented/self.totalCount:6.2%})'.rjust(60) + '\n'
out += f'Hyphenated: {self.hyphenatedCount:10d} ({self.hyphenatedCount/self.totalCount:6.2%})'.rjust(60) + '\n'
out += f'Malformed: {len(self.malformedTokens):10d} ({len(self.malformedTokens)/self.totalCount:6.2%})'.rjust(60) + '\n'
out += f'Tokens that are punctuation: {self.punctuationCount:10d} ({self.punctuationCount/self.totalCount:6.2%})'.rjust(60) + '\n\n'
out += f'Tokens available for evaluation: {self.tokenCount:10d} ({self.tokenCount/self.totalCount:6.2%})'.rjust(60) + '\n\n'
summary = Counter()
for num, _bin in _bins.items():
total = _bin.counts.pop('total', 0) if len(_bin.counts) > 0 else 0
previous = _bin.counts.pop('previous', dict())
out += f'BIN {num}\t\t {total:10d} tokens ({total/self.tokenCount:6.2%} of total)\n'
out += _bin.description + '\n'
out += f'Current heuristic: {_bin.heuristic}\n'
if len(_bin.counts) > 0:
for name, count in sorted(_bin.counts.items(), key=lambda x: x[0]):
out += f'{name:30}: {count:10d}'.rjust(50) + f' ({count/total:6.2%})\n'
summary[name] += count
else:
out += '\tNo tokens matched.\n'
if len(previous) > 0:
out += '\nNumber of previously binned tokens that\n'
out += 'move to this bin with the current model :\n'
for name, count in sorted(previous.items(), key=lambda x: x[0]):
out += f'{name:30}: {count:10d}'.rjust(50) + f' ({count/total:6.2%})\n'
if _bin.example:
(original, gold, kbest) = _bin.example
out += f'Example:\n'
inDict = ' * is in dictionary' if original in self.dictionary else ''
out += f'\toriginal = {original}{inDict}\n'
inDict = ' * is in dictionary' if gold is not None and gold in self.dictionary else ''
out += f'\tgold = {gold}{inDict}\n'
out += '\tkbest = [\n'
for k, item in kbest.items():
inDict = ' * is in dictionary' if item.candidate in self.dictionary else ''
out += f'\t\t{k}: {item.candidate} ({item.probability:.2e}){inDict}\n'
out += '\t]\n'
out += '\n\n\n'
out += 'Summary of annotations:\n'
for name, count in sorted(summary.items(), key=lambda x: x[0]):
out += f'{name:30}: {count:10d}'.rjust(60) + '\n'
if len(self.malformedTokens) > 0:
out += f'\n\n\nThere were some malformed tokens:\n\n'
for token in self.malformedTokens:
out += f'{pprint.pprint(vars(token))}\n\n'
return out
##########################################################################################
[docs]@dataclass
class Bin:
"""
Heuristics bin ...
TODO TABLE
"""
description: str
"""Description of bin"""
matcher: Callable[[str, str, Dictionary, str], bool]
"""Function or lambda which returns `True` if a given :class:`CorrectOCR.tokens.Token` fits into the bin, or `False` otherwise.
:param o: Original string
:param k: *k*-best candidate string
:param d: Dictionary
:param dcode: One of 'zerokd', 'somekd', 'allkd' for whether zero, some, or all other *k*-best candidates are in dictionary
"""
heuristic: str = 'annotator'
"""
Which heuristic the bin is set up for, one of:
- 'annotator' = Defer to annotator.
- 'original' = Select original.
- 'kbest' = Select top *k*-best.
- 'kdict' = Select top *k*-best in dictionary.
"""
number: int = None #: The number of the bin.
counts: DefaultDict[str, int] = field(default_factory=lambda: defaultdict(int)) #: Statistics used for reporting.
example: (original, gold, kbest) = None #: An example of a match, used for reporting.
def _copy(self):
return replace(self)
##########################################################################################
_bins: Dict[int, Bin] = OrderedDict({
1: Bin(
description='k1 == original and both are in dictionary.',
matcher=lambda o, k, d, dcode: o == k and o in d,
),
2: Bin(
description='k1 == original but they are not in dictionary, and no other kbest is in dictionary either.',
matcher=lambda o, k, d, dcode: o == k and o not in d and dcode == 'zerokd',
),
3: Bin(
description='k1 == original but they are not in dictionary, but some lower-ranked kbest is.',
matcher=lambda o, k, d, dcode: o == k and o not in d and dcode == 'somekd',
),
4: Bin(
description='k1 != original and is in dictionary while original isn''t.',
matcher=lambda o, k, d, dcode: o != k and o not in d and k in d,
),
5: Bin(
description='k1 != original and nothing is in dictionary.',
matcher=lambda o, k, d, dcode: o != k and o not in d and dcode == 'zerokd',
),
6: Bin(
description='k1 != original and neither are in dictionary, but a lower-ranked candidate is.',
matcher=lambda o, k, d, dcode: o != k and k not in d and o not in d and dcode == 'somekd',
),
7: Bin(
description='k1 != original and both are in dictionary.',
matcher=lambda o, k, d, dcode: o != k and o in d and k in d,
),
8: Bin(
description='k1 != original, original is in dictionary and no candidates are in dictionary.',
matcher=lambda o, k, d, dcode: o != k and o in d and dcode == 'zerokd',
),
9: Bin(
description='k1 != original, k1 is not in dictionary but both original and a lower candidate are.',
matcher=lambda o, k, d, dcode: o != k and o in d and k not in d and dcode == 'somekd',
),
10: Bin(
description='Catch-all bin, matches any remaining tokens. It is recommended to pass this to annotator.',
matcher=lambda o, k, d, dcode: True,
)
})