Source code for structa.source

# structa: an application for analyzing repetitive data structures
#
# Copyright (c) 2020-2021 Dave Jones <dave@waveform.org.uk>
#
# SPDX-License-Identifier: GPL-2.0-or-later

import io
import re
import csv
import json
import warnings
from chardet.universaldetector import UniversalDetector

from .errors import ValidationWarning

try:
    from ruamel import yaml
except ImportError:
    yaml = None


[docs]class Source: """ A generalized data source capable of automatically recognizing certain popular data formats, and guessing character encodings. Constructed with a mandatory file-like object as the *source*, and a multitude of keyword-only options, the decoded content can be access from :attr:`data` The *source* must have a :meth:`~io.RawIOBase.read` method which, given a number of bytes to return, returns a :class:`bytes` string up to that length, but has no requirements beyond this. Note that this means files over sockets or pipes are acceptable inputs. :param file source: The file-like object to decode (must have a ``read`` method). :param str encoding: The character encoding used in the source, or "auto" (the default) if it should be guessed from a sample of the data. :param bool encoding_strict: If :data:`True` (the default), raise an exception if character decoding errors occur. Otherwise, replace invalid characters silently. :param str format: If "auto" (the default), guess the format of the data source. Otherwise can be explicitly set to "csv", "yaml", or "json" to force parsing of that format. :param str csv_delimiter: If "auto" (the default), attempt to guess the field delimiter when the "csv" format is being decoded using the :class:`csv.Sniffer` class. Comma, semi-colon, space, and tab characters will be attempted. Otherwise must be set to the single character :class:`str` used as the field delimiter (e.g. ","). :param str csv_quotechar: If "auto" (the default), attempt to guess the string delimiter when the "csv" format is being decoded using the :class:`csv.Sniffer` class. Otherwise must be set to the single character :class:`str` used as the string delimiter (e.g. '"'). :param bool yaml_safe: If :data:`True` (the default) the "safe" YAML parser from `ruamel.yaml`_ will be used. :param bool json_strict: If :data:`True` (the default), control characters will not be permitted inside decoded strings. :param int sample_limit: The number of bytes to sample from the beginning of the stream when attempting to determine character encoding. Defaults to 1MB. .. _ruamel.yaml: https://pypi.org/project/ruamel.yaml/ """ def __init__(self, source, *, encoding='auto', encoding_strict=True, format='auto', csv_delimiter='auto', csv_quotechar='auto', yaml_safe=True, json_strict=True, sample_limit=1048576): self._source = source self._encoding = encoding self._encoding_strict = encoding_strict self._format = format self._csv_delimiter = csv_delimiter self._csv_quotechar = csv_quotechar self._csv_dialect = None self._yaml_safe = yaml_safe self._json_strict = json_strict self._sample_limit = sample_limit self._sample = b'' self._data = None @property def encoding(self): """ The character encoding detected or specified for the source, e.g. "utf-8". """ if self._encoding == 'auto': self._detect_encoding() return self._encoding @property def format(self): """ The data format detected or specified for the source, e.g. "csv", "yaml", or "json". """ if self._format == 'auto': self._detect_format() return self._format @property def csv_dialect(self): """ The :class:`csv.Dialect` used when :attr:`format` is "csv", or :data:`None` otherwise. """ if self.format == 'csv': if self._csv_dialect is None: self._detect_csv_dialect() return self._csv_dialect else: return None @property def data(self): """ The decoded data. Typically a :class:`list` or :class:`dict` of values, but can be any value representable in the source format. """ if self._data is None: self._load_data() return self._data def _sample_bytes(self): if len(self._sample) < self._sample_limit: self._sample += self._source.read( self._sample_limit - len(self._sample)) return self._sample def _sample_str(self): return self._sample_bytes().decode(self.encoding, errors='replace') def _detect_encoding(self): detector = UniversalDetector() detector.feed(self._sample_bytes()) result = detector.close() if result['confidence'] < 0.9: warnings.warn(ValidationWarning( 'Low confidence ({confidence}) in detected character set'. format_map(result))) self._encoding = result['encoding'] def _detect_format(self): sample = self._sample_str() if sample[:5] == '<?xml': self._format = 'xml' else: sample = sample.lstrip() if sample[:1] in ('[', '{'): self._format = 'json' elif sample[:5] == '<?xml': warnings.warn(ValidationWarning('whitespace before xml header')) self._format = 'xml' elif sample[:1] == '<': warnings.warn(ValidationWarning('missing xml header')) self._format = 'xml' else: self._detect_yaml_or_csv() def _detect_yaml_or_csv(self): # Strip potentially partial last line off sample = self._sample_str().splitlines(keepends=True)[:-1] quote_delims = re.compile('["\']') field_delims = re.compile('[,; \\t]') csv_score = yaml_score = 0 for line in sample: if ( line.startswith(('#', ' ', '-')) or line.endswith(':') ): # YAML comments, indented lines, "-" prefixed items and colon # suffixes are all atypical in CSV and strong indicators of # YAML yaml_score += 2 continue has_field_delims = bool(set(line) & set(',; \\t')) quote_delims = max( line.count(delim) for delim in ('"', "'")) if has_field_delims and quote_delims and not ( quote_delims % 2): # Both field and quote delimiters found in the line and quote # delimiters are paired. Also possible for YAML (hence # continue) but the presence of paired quotes is a strong # indicator of CSV csv_score += 2 elif line.count(':') == 1: # No quoted, field-delimited strings, but line contains # a single colon - weaker indicator of YAML yaml_score += 1 elif has_field_delims: # No quote delimiters, but field delimiters are present # with no colon in the line - weaker indicator of CSV csv_score += 1 if yaml_score > csv_score: self._format = 'yaml' elif csv_score > 0: self._format = 'csv' else: self._format = 'unknown' def _detect_csv_dialect(self): if self._csv_delimiter == 'auto' or self._csv_quotechar == 'auto': # First line is possible header; only need a few Kb for # analysis sample = self._sample_str() sample = ''.join(sample.splitlines(keepends=True)[1:])[:8192] self._csv_dialect = csv.Sniffer().sniff( sample, delimiters=",; \t" if self._csv_delimiter == 'auto' else self._csv_delimiter) else: class dialect(csv.Dialect): delimiter = self._csv_delimiter quotechar = self._csv_quotechar or None escapechar = None doublequote = True lineterminator = '\r\n' quoting = csv.QUOTE_MINIMAL self._csv_dialect = dialect def _load_data(self): # The apparently pointless _sample_bytes call below isn't actually # pointless; it's required to set the _sample cache in case it's # queried by a later query of encoding, csv_dialect, etc. data = self._sample_bytes() + self._source.read() data = data.decode( self.encoding, errors='strict' if self._encoding_strict else 'replace') if self.format == 'json': self._data = json.loads(data, strict=self._json_strict) elif self.format == 'csv': # Exclude the first row of data from analysis in case it's a header data = data.splitlines(keepends=True)[1:] reader = csv.reader(data, self.csv_dialect) self._data = list(reader) elif self.format == 'yaml': if not yaml: raise ImportError('ruamel.yaml package is not installed') else: loader = ( yaml.SafeLoader if self._yaml_safe else yaml.UnsafeLoader) self._data = yaml.load(io.StringIO(data), Loader=loader) elif self.format == 'xml': raise NotImplementedError() elif self.format == 'unknown': raise ValueError('unable to guess data format') else: assert False