Edge Domain
Edge Domain
The Edge domain governs data import and export.
Importers
The Importers live inside the importing
directory inside the project root. Each importer offers a new way to import data into the workspace. The importers should be agnostic about the available analyzers. However, the Importers currently provide a terminal user flow so that their options can be customized by the user—a necessity since each importer may expose different sets of options and may have different UX approaches for their configuration.
The importers eventually write data to a parquet file, whose path is provisioned by the application.
Here's what the entrypoint for the importer module looks like
./importing/init.py:
from .csv import CSVImporter
from .excel import ExcelImporter
from .importer import Importer, ImporterSession
importers: list[Importer[ImporterSession]] = [CSVImporter(), ExcelImporter()]
Semantic Preprocessor
The Semantic Preprocessor lives inside the preprocessing
directory inside the project root. It defines all the column data semantics—a kind of type system that is used to guide the user in selecting the right columns for the right analysis. It is agnostic about the specific analyzers but does depend on them in a generic way—the available semantics exist to support the needs of analyzers and will be extended as necessary.
Here's what the entrypoint for the preprocessing module looks like
./preprocessing/series_semantic.py:
from datetime import datetime
from typing import Callable, Type, Union
import polars as pl
from pydantic import BaseModel
from analyzer_interface import DataType
class SeriesSemantic(BaseModel):
semantic_name: str
column_type: Union[Type[pl.DataType], Callable[[pl.DataType], bool]]
prevalidate: Callable[[pl.Series], bool] = lambda s: True
try_convert: Callable[[pl.Series], pl.Series]
validate_result: Callable[[pl.Series], pl.Series] = lambda s: s.is_not_null()
data_type: DataType
def check(self, series: pl.Series, threshold: float = 0.8, sample_size: int = 100):
if not self.check_type(series):
return False
sample = sample_series(series, sample_size)
try:
if not self.prevalidate(sample):
return False
result = self.try_convert(sample)
except Exception:
return False
return self.validate_result(result).sum() / sample.len() > threshold
def check_type(self, series: pl.Series):
if isinstance(self.column_type, type):
return isinstance(series.dtype, self.column_type)
return self.column_type(series.dtype)
datetime_string = SeriesSemantic(
semantic_name="datetime",
column_type=pl.String,
try_convert=lambda s: s.str.strptime(pl.Datetime, strict=False),
data_type="datetime",
)
timestamp_seconds = SeriesSemantic(
semantic_name="timestamp_seconds",
column_type=lambda dt: dt.is_numeric(),
try_convert=lambda s: (s * 1_000).cast(pl.Datetime(time_unit="ms")),
validate_result=lambda s: ((s > datetime(2000, 1, 1)) & (s < datetime(2100, 1, 1))),
data_type="datetime",
)
timestamp_milliseconds = SeriesSemantic(
semantic_name="timestamp_milliseconds",
column_type=lambda dt: dt.is_numeric(),
try_convert=lambda s: s.cast(pl.Datetime(time_unit="ms")),
validate_result=lambda s: ((s > datetime(2000, 1, 1)) & (s < datetime(2100, 1, 1))),
data_type="datetime",
)
url = SeriesSemantic(
semantic_name="url",
column_type=pl.String,
try_convert=lambda s: s.str.strip_chars(),
validate_result=lambda s: s.str.count_matches("^https?://").gt(0),
data_type="url",
)
identifier = SeriesSemantic(
semantic_name="identifier",
column_type=pl.String,
try_convert=lambda s: s.str.strip_chars(),
validate_result=lambda s: s.str.count_matches(r"^@?[A-Za-z0-9_.:-]+$").eq(1),
data_type="identifier",
)
text_catch_all = SeriesSemantic(
semantic_name="free_text",
column_type=pl.String,
try_convert=lambda s: s,
validate_result=lambda s: constant_series(s, True),
data_type="text",
)
integer_catch_all = SeriesSemantic(
semantic_name="integer",
column_type=lambda dt: dt.is_integer(),
try_convert=lambda s: s,
validate_result=lambda s: constant_series(s, True),
data_type="integer",
)
float_catch_all = SeriesSemantic(
semantic_name="float",
column_type=lambda dt: dt.is_float(),
try_convert=lambda s: s,
validate_result=lambda s: constant_series(s, True),
data_type="float",
)
boolean_catch_all = SeriesSemantic(
semantic_name="boolean",
column_type=pl.Boolean,
try_convert=lambda s: s,
validate_result=lambda s: constant_series(s, True),
data_type="boolean",
)
all_semantics = [
datetime_string,
timestamp_seconds,
timestamp_milliseconds,
url,
identifier,
text_catch_all,
integer_catch_all,
float_catch_all,
boolean_catch_all,
]
def infer_series_semantic(
series: pl.Series, *, threshold: float = 0.8, sample_size=100
):
for semantic in all_semantics:
if semantic.check(series, threshold=threshold, sample_size=sample_size):
return semantic
return None
def sample_series(series: pl.Series, n: int = 100):
if series.len() < n:
return series
return series.sample(n, seed=0)
def constant_series(series: pl.Series, constant) -> pl.Series:
"""Create a series with a constant value for each row of `series`."""
return pl.Series([constant] * series.len(), dtype=pl.Boolean)
Next Steps
Once you finish reading this section it would be a good idea to review the other domain sections. Might also be a good idea to review the sections that discuss implementing Shiny, and React dashboards.