Source code for CorrectOCR.workspace

from __future__ import annotations

import logging
import re
import requests
import urllib
from pathlib import Path, PurePath
from pprint import pformat
from typing import Any, Dict, Iterator, List, Tuple, Union

import progressbar

from ._cache import LRUCache, cached
from .aligner import Aligner
from .dictionary import Dictionary
from .fileio import FileIO
from .heuristics import Heuristics
from .model import HMM
from .tokens import Tokenizer, TokenList, tokenize_str


[docs]class Workspace(object): """ The Workspace holds references to :class:`Documents<CorrectOCR.workspace.Document>` and resources used by the various :mod:`commands<CorrectOCR.commands>`. :param workspaceconfig: An object with the following properties: - **nheaderlines** (:class:`int`): The number of header lines in corpus texts. - **language**: A language instance from `pycountry <https://pypi.org/project/pycountry/>`. - **originalPath** (:class:`Path<pathlib.Path>`): Directory containing the original docs. - **goldPath** (:class:`Path<pathlib.Path>`): Directory containing the gold (if any) docs. - **trainingPath** (:class:`Path<pathlib.Path>`): Directory for storing intermediate docs. - **correctedPath** (:class:`Path<pathlib.Path>`): Directory for saving corrected docs. :param resourceconfig: Passed directly to :class:`ResourceManager<CorrectOCR.workspace.ResourceManager>`, see this for further info. :param storageconfig: TODO """ log = logging.getLogger(f'{__name__}.Workspace') def __init__(self, workspaceconfig, resourceconfig, storageconfig): self.config = workspaceconfig self.storageconfig = storageconfig self.root = self.config.rootPath.resolve() self._originalPath = self.root.joinpath(self.config.originalPath) Workspace.log.info(f'Workspace configuration:\n{pformat(vars(self.config))} at {self.root}') Workspace.log.info(f'Storage configuration:\n{pformat(vars(self.storageconfig))}') self.nheaderlines: int = self.config.nheaderlines self.resources = ResourceManager(self.root, resourceconfig) self.docs: Dict[str, Document] = dict() Workspace.log.info(f'Adding documents from: {self._originalPath}') for file in self._originalPath.iterdir(): if file.name in {'.DS_Store'}: continue self.add_doc(file) Workspace.log.info(f'Workspace documents: {self.docs}') self.cache = LRUCache(maxsize=1000) self.cachePath = FileIO.cachePath
[docs] def add_doc(self, doc: Any) -> str: """ Initializes a new :class:`Document<CorrectOCR.workspace.Document>` and adds it to the workspace. The doc_id of the document will be determined by its filename. If the file is not in the originalPath, it will be copied or downloaded there. :param doc: A path or URL. """ self.log.debug(f'Preparing to add {doc}') if isinstance(doc, PurePath): if doc.parent != self._originalPath: FileIO.copy(doc, self._originalPath) elif isinstance(doc, str) and doc[:4] == 'http': url = urllib.parse.urlparse(doc) new_doc_file = self._originalPath.joinpath(Path(url.path).name) r = requests.get(url.geturl()) if r.status_code == 200: new_doc_file = new_doc_file.with_suffix('.pdf') # TODO mimetype => suffix with open(new_doc_file, 'wb') as f: f.write(r.content) else: self.log.error(f'Unable to save file: {r}') doc = new_doc_file else: raise ValueError(f'Cannot add doc from reference of unknown type: {type(doc)} {doc}') doc_id = doc.stem self.docs[doc_id] = Document( self, doc, self.root.joinpath(self.config.originalPath).resolve(), self.root.joinpath(self.config.goldPath).resolve(), self.root.joinpath(self.config.trainingPath).resolve(), self.root.joinpath(self.config.correctedPath).resolve(), self.nheaderlines, ) Workspace.log.debug(f'Added {doc}') return doc_id
[docs] def docids_for_ext(self, ext: str) -> List[str]: """ Returns a list of IDs for documents with the given extension. """ return [docid for docid, doc in self.docs.items() if doc.ext == ext]
[docs] def original_tokens(self) -> Iterator[Tuple[str, TokenList]]: """ Yields an iterator of (docid, list of tokens). """ for docid, doc in self.docs.items(): Workspace.log.debug(f'Getting original tokens from {docid}') yield docid, doc.tokens
[docs] def gold_tokens(self) -> Iterator[Tuple[str, TokenList]]: """ Yields an iterator of (docid, list of gold-aligned tokens). """ for docid, doc in self.docs.items(): if doc.goldFile.is_file(): doc.prepare('align', self.config.k) Workspace.log.debug(f'Getting gold tokens from {docid}') yield docid, [t for t in doc.tokens if t.gold is not None]
[docs] def cleanup(self, dryrun=True, full=False): """ Cleans out the backup files in the ``trainingPath``. :param dryrun: Just lists the files without actually deleting them :param full: Also deletes the current files (ie. those without .nnn. in their suffix). """ is_backup = re.compile(r'^\.\d\d\d$') for file in self._trainingPath.iterdir(): if file.name[0] == '.': continue #self.log.debug(f'file: {file}') if full or is_backup.match(file.suffixes[-2]): self.log.info(f'Deleting {file}') if not dryrun: FileIO.delete(file)
@cached def _cached_page_image(self, docid: str, page: int): Workspace.log.debug(f'_cached_page_image: {docid} page {page}') import fitz doc = fitz.open(str(self.docs[docid].originalFile)) _page = doc[page] img_info = _page.getImageList()[0] xref = img_info[0] pix = fitz.Pixmap(doc, xref) return xref, _page.rect, pix
##########################################################################################
[docs]class Document(object): log = logging.getLogger(f'{__name__}.Document') """ Documents provide access to paths and :class:`Tokens<CorrectOCR.tokens.Token>`. """ def __init__(self, workspace: Workspace, doc: Path, original: Path, gold: Path, training: Path, corrected: Path, nheaderlines: int = 0): """ :param doc: A path to a file. :param original: Directory for original uncorrected files. :param gold: Directory for known correct "gold" files (if any). :param training: Directory for storing intermediate files. :param corrected: Directory for saving corrected files. :param nheaderlines: Number of lines in file header (only relevant for ``.txt`` files) """ self.workspace = workspace self.docid = doc.stem self.ext = doc.suffix if self.ext == '.txt': self.originalFile: Union[CorpusFile, Path] = CorpusFile(original.joinpath(doc.name), nheaderlines) self.goldFile: Union[CorpusFile, Path] = CorpusFile(gold.joinpath(doc.name), nheaderlines) self.correctedFile: Union[CorpusFile, Path] = CorpusFile(corrected.joinpath(doc.name), nheaderlines) else: self.originalFile: Union[CorpusFile, Path] = original.joinpath(doc.name) self.goldFile: Union[CorpusFile, Path] = gold.joinpath(doc.name) self.correctedFile: Union[CorpusFile, Path] = corrected.joinpath(doc.name) if not self.originalFile.exists(): raise ValueError(f'Cannot create Document with non-existing original file: {self.originalFile}') self.tokenFile = training.joinpath(f'{self.docid}.csv') #: Path to token file (CSV format). self.fullAlignmentsFile = training.joinpath(f'{self.docid}.fullAlignments.json') #: Path to full letter-by-letter alignments (JSON format). self.wordAlignmentsFile = training.joinpath(f'{self.docid}.wordAlignments.json') #: Path to word-by-word alignments (JSON format). self.readCountsFile = training.joinpath(f'{self.docid}.readCounts.json') #: Path to letter read counts (JSON format). self.tokens = TokenList.new(self.workspace.storageconfig) if TokenList.exists(self.workspace.storageconfig, self.docid): self.tokens.load(self.docid) Workspace.log.debug(f'Loaded {len(self.tokens)} tokens.')
[docs] def alignments(self, force=False) -> Tuple[list, dict, list]: """ Uses the :class:`Aligner<CorrectOCR.aligner.Aligner>` to generate alignments for a given original, gold pair of docs. Caches its results in the ``trainingPath``. :param force: Back up existing alignment docs and create new ones. """ faPath = self.fullAlignmentsFile waPath = self.wordAlignmentsFile mcPath = self.readCountsFile if not force and (faPath.is_file() and waPath.is_file() and mcPath.is_file()): Document.log.info(f'Alignments for {self.docid} exist and are presumed correct, will read and return. Use --force or delete alignments to rerun a subset.') return ( FileIO.load(faPath), {o: {int(k): v for k, v in i.items()} for o, i in FileIO.load(waPath).items()}, FileIO.load(mcPath) ) Document.log.info(f'Creating alignments for {self.docid}') if self.originalFile.body == self.goldFile.body: Document.log.critical(f'Original and gold are identical for {self.docid}!') raise SystemExit(-1) (fullAlignments, wordAlignments, readCounts) = Aligner().alignments( tokenize_str(self.originalFile.body, self.workspace.config.language.name), tokenize_str(self.goldFile.body, self.workspace.config.language.name) ) FileIO.save(fullAlignments, faPath) FileIO.save(wordAlignments, waPath) FileIO.save(readCounts, mcPath) #Workspace.log.debug(f'wordAlignments: {wordAlignments}') return fullAlignments, wordAlignments, readCounts
[docs] def prepare(self, step: str, k: int, dehyphenate=False, force=False): """ Prepares the :class:`Tokens<CorrectOCR.tokens.Token>` for the given doc. :param k: How many `k`-best suggestions to calculate, if necessary. :param dehyphenate: Whether to attempt dehyphenization of tokens. :param force: Back up existing tokens and create new ones. """ log = logging.getLogger(f'{__name__}._get_prep_step') prep_methods = { 'server': 'autocorrect', 'all': 'autocorrect', } step = prep_methods.get(step, step) Document.log.info(f'Creating {step} tokens for {self.docid}') if step == 'tokenize': if force or len(self.tokens) == 0: tokenizer = Tokenizer.for_extension(self.ext)(self.workspace.config.language, dehyphenate) self.tokens = tokenizer.tokenize( self.originalFile, self.workspace.storageconfig ) elif step == 'align': self.prepare('tokenize', k, dehyphenate, force) if self.goldFile.is_file(): (_, wordAlignments, _) = self.alignments() for i, token in enumerate(self.tokens): # TODO force if not token.gold and token.original in wordAlignments: wa = wordAlignments[token].items() closest = sorted(wa, key=lambda x: abs(x[0]-i)) #Document.log.debug(f'{wa} {i} {token.original} {closest}') token.gold = closest[0][1] elif step == 'kbest': if self.goldFile.is_file(): self.prepare('align', k, dehyphenate, force) else: self.prepare('tokenize', k, dehyphenate, force) self.workspace.resources.hmm.generate_kbest(self.tokens, k, force) elif step == 'bin': self.prepare('kbest', k, dehyphenate, force) self.workspace.resources.heuristics.bin_tokens(self.tokens, force) elif step == 'autocorrect': self.prepare('bin', k, dehyphenate, force) for t in progressbar.progressbar(self.tokens): if force or not t.gold: if t.decision in {'kbest', 'kdict'}: t.gold = t.kbest[int(t.selection)].candidate elif t.decision == 'original': t.gold = t.original self.tokens.save()
[docs] def crop_tokens(self, edge_left = None, edge_right = None): Tokenizer.for_extension(self.ext).crop_tokens(self.originalFile, self.workspace.storageconfig, self.tokens) self.tokens.save()
[docs]class CorpusFile(object): """ Simple wrapper for text files to manage a number of lines as a separate header. """ log = logging.getLogger(f'{__name__}.CorpusFile') def __init__(self, path: Path, nheaderlines: int = 0): """ :param path: Path to text file. :param nheaderlines: Number of lines from beginning to separate out as header. """ self.path = path self.nheaderlines = nheaderlines if self.path.is_file(): lines = FileIO.load(self.path).split('\n') (self.header, self.body) = ( str.join('', lines[:self.nheaderlines-1]), str.join('', lines[nheaderlines:]) ) else: (self.header, self.body) = ('', '')
[docs] def save(self): """ Concatenate header and body and save. """ if not self.header or self.header.strip() == '': self.header = '' elif self.header[-1] != '\n': self.header += '\n' CorpusFile.log.info(f'Saving file to {self.path}') FileIO.save(self.header + self.body, self.path)
[docs] def is_file(self) -> bool: """ :return: Does the file exist? See :meth:`pathlib.Path.is_file`. """ return self.path.is_file()
@property def id(self): return self.path.stem
##########################################################################################
[docs]class JSONResource(dict): """ Simple wrapper for JSON files. """ log = logging.getLogger(f'{__name__}.JSONResource') def __init__(self, path, **kwargs): """ :param path: Path to load from. :param kwargs: TODO """ super().__init__(**kwargs) JSONResource.log.info(f'Loading {path}') self._path = path data = FileIO.load(self._path, default=dict()) if data: self.update(data)
[docs] def save(self): """ Save to JSON file. """ FileIO.save(self, self._path)
def __repr__(self): return f'<JSONResource {self._path}: {dict(self)}>'
##########################################################################################
[docs]class ResourceManager(object): """ Helper for the Workspace to manage various resources. """ log = logging.getLogger(f'{__name__}.ResourceManager') def __init__(self, root: Path, config): """ :param root: Path to resources directory. :param config: An object with the following properties: - **correctionTrackingFile** (:class:`Path<pathlib.Path>`): Path to file containing correction tracking. - TODO """ self.root = root.joinpath(config.resourceRootPath).resolve() ResourceManager.log.info(f'ResourceManager configuration:\n{pformat(vars(config))} at {self.root}') self.correctionTracking = JSONResource(self.root.joinpath(config.correctionTrackingFile).resolve()) self.memoizedCorrections = JSONResource(self.root.joinpath(config.memoizedCorrectionsFile).resolve()) self.multiCharacterError = JSONResource(self.root.joinpath(config.multiCharacterErrorFile).resolve()) self.dictionary = Dictionary(self.root.joinpath(config.dictionaryFile).resolve(), config.ignoreCase) self.hmm = HMM(self.root.joinpath(config.hmmParamsFile).resolve(), self.multiCharacterError, self.dictionary) self.reportFile = self.root.joinpath(config.reportFile).resolve() self.heuristics = Heuristics(JSONResource(self.root.joinpath(config.heuristicSettingsFile).resolve()), self.dictionary)