Skip to content

Core Domain

Core Domain

Application

The Application lives inside the app directory in the project root. This is responsible for defining and executing all capabilities of the application's workspace. Any extension or modification of the application's workspace capabilities should be done here.

The application code should be free of specific storage implementation and be agnostic about the specifics of the terminal interface and the available analyzers.

Here's what the entrypoint for the application module looks like

./app/init.py:

from .analysis_context import AnalysisContext
from .analysis_output_context import AnalysisOutputContext
from .analysis_webserver_context import AnalysisWebServerContext
from .app import App
from .app_context import AppContext
from .project_context import ProjectContext
from .settings_context import SettingsContext

Terminal Components

The Terminal Components live inside the terminal_tools inside the project root. Their main responsibility is user flow, rendering the terminal interface, and handling user input.

The user flow understandably depends on the set of capabilities offered by the Application, so an adjustment there may require an adjustment here.

Here's what the entrypoint for the termnal module looks like

./terminal_tools/init.py

from .progress import ProgressReporter
from .utils import (
    clear_printed_lines,
    clear_terminal,
    draw_box,
    enable_windows_ansi_support,
    open_directory_explorer,
    print_ascii_table,
    wait_for_key,
)

Storage IO

The Storage IO lives Inside the storage directory inside the project root. It is responsible for interacting directly with the file system where the workspace data and data files are stored. It makes decisions on paths, intermediate file formats, and database schema and implementation. It should know as little as possible about how the data is used and should be agnostic about the specifics of the terminal interface and the available analyzers.

Here's what the entrypoint for the storage module looks like

./storage/init.py:

import math
import os
import re
import shutil
from datetime import datetime
from typing import Callable, Iterable, Literal, Optional

import platformdirs
import polars as pl
import pyarrow.parquet as pq
from filelock import FileLock
from pydantic import BaseModel
from tinydb import Query, TinyDB
from xlsxwriter import Workbook

from analyzer_interface.interface import AnalyzerOutput
from analyzer_interface.params import ParamValue

from .file_selector import FileSelectorStateManager


class ProjectModel(BaseModel):
    class_: Literal["project"] = "project"
    id: str
    display_name: str


class SettingsModel(BaseModel):
    class_: Literal["settings"] = "settings"
    export_chunk_size: Optional[int | Literal[False]] = None


class FileSelectionState(BaseModel):
    class_: Literal["file_selector_state"] = "file_selector_state"
    last_path: Optional[str] = None


class AnalysisModel(BaseModel):
    class_: Literal["analysis"] = "analysis"
    analysis_id: str
    project_id: str
    display_name: str
    primary_analyzer_id: str
    path: str
    column_mapping: Optional[dict[str, str]] = None
    create_timestamp: Optional[float] = None
    param_values: dict[str, ParamValue] = dict()
    is_draft: bool = False

    def create_time(self):
        return (
            datetime.fromtimestamp(self.create_timestamp)
            if self.create_timestamp
            else None
        )


SupportedOutputExtension = Literal["parquet", "csv", "xlsx", "json"]


class Storage:
    def __init__(self, *, app_name: str, app_author: str):
        self.user_data_dir = platformdirs.user_data_dir(
            appname=app_name, appauthor=app_author, ensure_exists=True
        )
        self.temp_dir = platformdirs.user_cache_dir(
            appname=app_name, appauthor=app_author, ensure_exists=True
        )
        self.db = TinyDB(self._get_db_path())
        with self._lock_database():
            self._bootstrap_analyses_v1()

        self.file_selector_state = AppFileSelectorStateManager(self)

    def init_project(self, *, display_name: str, input_temp_file: str):
        with self._lock_database():
            project_id = self._find_unique_project_id(display_name)
            project = ProjectModel(id=project_id, display_name=display_name)
            self.db.insert(project.model_dump())

        project_dir = self._get_project_path(project_id)
        os.makedirs(project_dir, exist_ok=True)

        shutil.move(input_temp_file, self._get_project_input_path(project_id))
        return project

    def list_projects(self):
        q = Query()
        projects = self.db.search(q["class_"] == "project")
        return sorted(
            (ProjectModel(**project) for project in projects),
            key=lambda project: project.display_name,
        )

    def get_project(self, project_id: str):
        q = Query()
        project = self.db.search((q["class_"] == "project") & (q["id"] == project_id))
        if project:
            return ProjectModel(**project[0])
        return None

    def delete_project(self, project_id: str):
        with self._lock_database():
            q = Query()
            self.db.remove((q["id"] == project_id) & (q["class_"] == "project"))
        project_path = self._get_project_path(project_id)
        shutil.rmtree(project_path, ignore_errors=True)

    def rename_project(self, project_id: str, name: str):
        with self._lock_database():
            q = Query()
            self.db.update(
                {"display_name": name},
                (q["id"] == project_id) & (q["class_"] == "project"),
            )

    def load_project_input(self, project_id: str, *, n_records: Optional[int] = None):
        input_path = self._get_project_input_path(project_id)
        return pl.read_parquet(input_path, n_rows=n_records)

    def get_project_input_stats(self, project_id: str):
        input_path = self._get_project_input_path(project_id)
        num_rows = pl.scan_parquet(input_path).select(pl.count()).collect().item()
        return TableStats(num_rows=num_rows)

    def save_project_primary_outputs(
        self, analysis: AnalysisModel, outputs: dict[str, pl.DataFrame]
    ):
        for output_id, output_df in outputs.items():
            self._save_output(
                os.path.join(
                    self._get_project_primary_output_root_path(analysis),
                    output_id,
                ),
                output_df,
                "parquet",
            )

    def save_project_secondary_outputs(
        self,
        analysis: AnalysisModel,
        secondary_id: str,
        outputs: dict[str, pl.DataFrame],
    ):
        for output_id, output_df in outputs.items():
            self._save_output(
                os.path.join(
                    self._get_project_secondary_output_root_path(
                        analysis, secondary_id
                    ),
                    output_id,
                ),
                output_df,
                "parquet",
            )

    def save_project_secondary_output(
        self,
        analysis: AnalysisModel,
        secondary_id: str,
        output_id: str,
        output_df: pl.DataFrame,
        extension: SupportedOutputExtension,
    ):
        root_path = self._get_project_secondary_output_root_path(analysis, secondary_id)
        self._save_output(
            os.path.join(root_path, output_id),
            output_df,
            extension,
        )

    def _save_output(
        self,
        output_path_without_extension,
        output_df: pl.DataFrame | pl.LazyFrame,
        extension: SupportedOutputExtension,
    ):
        output_df = output_df.lazy()
        os.makedirs(os.path.dirname(output_path_without_extension), exist_ok=True)
        output_path = f"{output_path_without_extension}.{extension}"
        if extension == "parquet":
            output_df.sink_parquet(output_path)
        elif extension == "csv":
            output_df.sink_csv(output_path)
        elif extension == "xlsx":
            # See https://xlsxwriter.readthedocs.io/working_with_dates_and_time.html#timezone-handling
            with Workbook(output_path, {"remove_timezone": True}) as workbook:
                output_df.collect().write_excel(workbook)
        elif extension == "json":
            output_df.collect().write_json(output_path)
        else:
            raise ValueError(f"Unsupported format: {extension}")
        return output_path

    def load_project_primary_output(self, analysis: AnalysisModel, output_id: str):
        output_path = self.get_primary_output_parquet_path(analysis, output_id)
        return pl.read_parquet(output_path)

    def get_primary_output_parquet_path(self, analysis: AnalysisModel, output_id: str):
        return os.path.join(
            self._get_project_primary_output_root_path(analysis),
            f"{output_id}.parquet",
        )

    def load_project_secondary_output(
        self, analysis: AnalysisModel, secondary_id: str, output_id: str
    ):
        output_path = self.get_secondary_output_parquet_path(
            analysis, secondary_id, output_id
        )
        return pl.read_parquet(output_path)

    def get_secondary_output_parquet_path(
        self, analysis: AnalysisModel, secondary_id: str, output_id: str
    ):
        return os.path.join(
            self._get_project_secondary_output_root_path(analysis, secondary_id),
            f"{output_id}.parquet",
        )

    def export_project_primary_output(
        self,
        analysis: AnalysisModel,
        output_id: str,
        *,
        extension: SupportedOutputExtension,
        spec: AnalyzerOutput,
        export_chunk_size: Optional[int] = None,
    ):
        return self._export_output(
            self.get_primary_output_parquet_path(analysis, output_id),
            os.path.join(self._get_project_exports_root_path(analysis), output_id),
            extension=extension,
            spec=spec,
            export_chunk_size=export_chunk_size,
        )

    def export_project_secondary_output(
        self,
        analysis: AnalysisModel,
        secondary_id: str,
        output_id: str,
        *,
        extension: SupportedOutputExtension,
        spec: AnalyzerOutput,
        export_chunk_size: Optional[int] = None,
    ):
        exported_path = os.path.join(
            self._get_project_exports_root_path(analysis),
            (
                secondary_id
                if secondary_id == output_id
                else f"{secondary_id}__{output_id}"
            ),
        )
        return self._export_output(
            self.get_secondary_output_parquet_path(analysis, secondary_id, output_id),
            exported_path,
            extension=extension,
            spec=spec,
            export_chunk_size=export_chunk_size,
        )

    def _export_output(
        self,
        input_path: str,
        output_path: str,
        *,
        extension: SupportedOutputExtension,
        spec: AnalyzerOutput,
        export_chunk_size: Optional[int] = None,
    ):
        with pq.ParquetFile(input_path) as reader:
            num_chunks = (
                math.ceil(reader.metadata.num_rows / export_chunk_size)
                if export_chunk_size
                else 1
            )

        if num_chunks == 1:
            df = pl.scan_parquet(input_path)
            self._save_output(output_path, spec.transform_output(df), extension)
            return f"{output_path}.{extension}"

        with pq.ParquetFile(input_path) as reader:
            get_batches = (
                df
                for batch in reader.iter_batches()
                if (df := pl.from_arrow(batch)) is not None
            )
            for chunk_id, chunk in enumerate(
                collect_dataframe_chunks(get_batches, export_chunk_size)
            ):
                chunk = spec.transform_output(chunk)
                self._save_output(f"{output_path}_{chunk_id}", chunk, extension)
                yield chunk_id / num_chunks
            return f"{output_path}_[*].{extension}"

    def list_project_analyses(self, project_id: str):
        with self._lock_database():
            q = Query()
            analysis_models = self.db.search(
                (q["class_"] == "analysis") & (q["project_id"] == project_id)
            )
        return [AnalysisModel(**analysis) for analysis in analysis_models]

    def init_analysis(
        self,
        project_id: str,
        display_name: str,
        primary_analyzer_id: str,
        column_mapping: dict[str, str],
        param_values: dict[str, ParamValue],
    ) -> AnalysisModel:
        with self._lock_database():
            analysis_id = self._find_unique_analysis_id(project_id, display_name)
            analysis = AnalysisModel(
                analysis_id=analysis_id,
                project_id=project_id,
                display_name=display_name,
                primary_analyzer_id=primary_analyzer_id,
                path=os.path.join("analysis", analysis_id),
                column_mapping=column_mapping,
                create_timestamp=datetime.now().timestamp(),
                param_values=param_values,
                is_draft=True,
            )
            self.db.insert(analysis.model_dump())
        return analysis

    def save_analysis(self, analysis: AnalysisModel):
        with self._lock_database():
            q = Query()
            self.db.update(
                analysis.model_dump(),
                (q["class_"] == "analysis")
                & (q["project_id"] == analysis.project_id)
                & (q["analysis_id"] == analysis.analysis_id),
            )

    def delete_analysis(self, analysis: AnalysisModel):
        with self._lock_database():
            q = Query()
            self.db.remove(
                (q["class_"] == "analysis")
                & (q["project_id"] == analysis.project_id)
                & (q["analysis_id"] == analysis.analysis_id)
            )
            analysis_path = os.path.join(
                self._get_project_path(analysis.project_id), analysis.path
            )
            shutil.rmtree(analysis_path, ignore_errors=True)

    def _find_unique_analysis_id(self, project_id: str, display_name: str):
        return self._get_unique_name(
            self._slugify_name(display_name),
            lambda analysis_id: self._is_analysis_id_unique(project_id, analysis_id),
        )

    def _is_analysis_id_unique(self, project_id: str, analysis_id: str):
        q = Query()
        id_unique = not self.db.search(
            (q["class_"] == "analysis")
            & (q["project_id"] == project_id)
            & (q["analysis_id"] == analysis_id)
        )
        dir_unique = not os.path.exists(
            os.path.join(self._get_project_path(project_id), "analysis", analysis_id)
        )
        return id_unique and dir_unique

    def _bootstrap_analyses_v1(self):
        legacy_v1_analysis_dirname = "analyzers"
        projects = self.list_projects()
        for project in projects:
            project_id = project.id
            project_path = self._get_project_path(project_id)
            try:
                v1_analyses = os.listdir(
                    os.path.join(project_path, legacy_v1_analysis_dirname)
                )
            except FileNotFoundError:
                continue
            for analyzer_id in v1_analyses:
                db_analyzer_id = f"__v1__{analyzer_id}"
                modified_time = os.path.getmtime(
                    os.path.join(project_path, legacy_v1_analysis_dirname, analyzer_id)
                )
                self.db.upsert(
                    AnalysisModel(
                        analysis_id=db_analyzer_id,
                        project_id=project_id,
                        display_name=analyzer_id,
                        primary_analyzer_id=analyzer_id,
                        path=os.path.join(legacy_v1_analysis_dirname, analyzer_id),
                        create_timestamp=modified_time,
                    ).model_dump(),
                    (Query()["class_"] == "analysis")
                    & (Query()["project_id"] == project_id)
                    & (Query()["analysis_id"] == db_analyzer_id),
                )

    def list_secondary_analyses(self, analysis: AnalysisModel) -> list[str]:
        try:
            analyzers = os.listdir(
                os.path.join(
                    self._get_project_path(analysis.project_id),
                    analysis.path,
                    "secondary_outputs",
                ),
            )
            return analyzers
        except FileNotFoundError:
            return []

    def _find_unique_project_id(self, display_name: str):
        """Turn the display name into a unique project ID"""
        return self._get_unique_name(
            self._slugify_name(display_name), self._is_project_id_unique
        )

    def _is_project_id_unique(self, project_id: str):
        """Check the database if the project ID is unique"""
        q = Query()
        id_unique = not self.db.search(
            q["class_"] == "project" and q["id"] == project_id
        )
        dir_unique = not os.path.exists(self._get_project_path(project_id))
        return id_unique and dir_unique

    def _get_db_path(self):
        return os.path.join(self.user_data_dir, "db.json")

    def _get_project_path(self, project_id: str):
        return os.path.join(self.user_data_dir, "projects", project_id)

    def _get_project_input_path(self, project_id: str):
        return os.path.join(self._get_project_path(project_id), "input.parquet")

    def _get_project_primary_output_root_path(self, analysis: AnalysisModel):
        return os.path.join(
            self._get_project_path(analysis.project_id),
            analysis.path,
            "primary_outputs",
        )

    def _get_project_secondary_output_root_path(
        self, analysis: AnalysisModel, secondary_id: str
    ):
        return os.path.join(
            self._get_project_path(analysis.project_id),
            analysis.path,
            "secondary_outputs",
            secondary_id,
        )

    def _get_project_exports_root_path(self, analysis: AnalysisModel):
        return os.path.join(
            self._get_project_path(analysis.project_id), analysis.path, "exports"
        )

    def _get_web_presenter_state_path(self, analysis: AnalysisModel, presenter_id: str):
        return os.path.join(
            self._get_project_path(analysis.project_id),
            analysis.path,
            "web_presenters",
            presenter_id,
            "state",
        )

    def _lock_database(self):
        """
        Locks the database to prevent concurrent access, in case multiple instances
        of the application are running.
        """
        lock_path = os.path.join(self.temp_dir, "db.lock")
        return FileLock(lock_path)

    def get_settings(self):
        with self._lock_database():
            return self._get_settings()

    def _get_settings(self):
        q = Query()
        settings = self.db.search(q["class_"] == "settings")
        if settings:
            return SettingsModel(**settings[0])
        return SettingsModel()

    def save_settings(self, **kwargs):
        with self._lock_database():
            q = Query()
            settings = self._get_settings()
            new_settings = SettingsModel(
                **{
                    **settings.model_dump(),
                    **{
                        key: value for key, value in kwargs.items() if value is not None
                    },
                }
            )
            self.db.upsert(new_settings.model_dump(), q["class_"] == "settings")

    @staticmethod
    def _slugify_name(name: str):
        return re.sub(r"\W+", "_", name.lower()).strip("_")

    @staticmethod
    def _get_unique_name(base_name: str, validator: Callable[[str], bool]):
        if validator(base_name):
            return base_name
        i = 1
        while True:
            candidate = f"{base_name}_{i}"
            if validator(candidate):
                return candidate
            i += 1


class TableStats(BaseModel):
    num_rows: int


def collect_dataframe_chunks(
    input: Iterable[pl.DataFrame], size_threshold: int
) -> Iterable[pl.DataFrame]:
    output_buffer = []
    size = 0
    for df in input:
        while True:
            available_space = size_threshold - size
            slice = df.head(available_space)
            output_buffer.append(slice)
            size = size + slice.height
            remaining_space = available_space - slice.height

            if remaining_space == 0:
                yield pl.concat(output_buffer)
                output_buffer = []
                size = 0

            if slice.height == df.height:
                break
            else:
                df = df.tail(-available_space)

    if output_buffer:
        yield pl.concat(output_buffer)


class AppFileSelectorStateManager(FileSelectorStateManager):
    def __init__(self, storage: "Storage"):
        self.storage = storage

    def get_current_path(self):
        return self._load_state().last_path

    def set_current_path(self, path: str):
        self._save_state(path)

    def _load_state(self):
        q = Query()
        state = self.storage.db.search(q["class_"] == "file_selector_state")
        if state:
            return FileSelectionState(**state[0])
        return FileSelectionState()

    def _save_state(self, last_path: str):
        self.storage.db.upsert(
            FileSelectionState(last_path=last_path).model_dump(),
            Query()["class_"] == "file_selector_state",
        )

Next Steps

Once you finish reading this section it would be a good idea to review the other domain sections. Might also be a good idea to review the sections that discuss implementing Shiny, and React dashboards.