Skip to content

Testing

testing

testers

test_primary_analyzer(interface, main, *, input, outputs, params=dict())

Runs the primary analyzer test.

Parameters:

Name Type Description Default
interface
AnalyzerInterface

The interface of the analyzer.

required
main
Callable[[PrimaryAnalyzerContext], None]

The main function of the analyzer.

required
input
TestData

The input data.

required
params
dict[str, ParamValue]

(Optional) The analysis parameters.

dict()
outputs
dict[str, TestData]

The output data, keyed by output ID.

required
Source code in testing/testers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@pytest.mark.skip()
def test_primary_analyzer(
    interface: AnalyzerInterface,
    main: Callable[[PrimaryAnalyzerContext], None],
    *,
    input: TestData,
    outputs: dict[str, TestData],
    params: dict[str, ParamValue] = dict(),
):
    """
    Runs the primary analyzer test.

    Args:
        interface (AnalyzerInterface): The interface of the analyzer.
        main (Callable[[PrimaryAnalyzerContext], None]): The main function of the analyzer.
        input (TestData): The input data.
        params (dict[str, ParamValue]): (Optional) The analysis parameters.
        outputs (dict[str, TestData]): The output data, keyed by output ID.
    """
    with ExitStack() as exit_stack:
        temp_dir = exit_stack.enter_context(TemporaryDirectory(delete=True))
        actual_output_dir = exit_stack.enter_context(TemporaryDirectory(delete=True))
        actual_input_dir = exit_stack.enter_context(TemporaryDirectory(delete=True))

        input_path = os.path.join(actual_input_dir, "input.parquet")
        input.convert_to_parquet(input_path)

        context = TestPrimaryAnalyzerContext(
            temp_dir=temp_dir,
            input_parquet_path=input_path,
            param_values=params,
            output_parquet_root_path=actual_output_dir,
        )
        main(context)

        specified_outputs = [output_spec.id for output_spec in interface.outputs]
        unused_outputs = [
            output_id
            for output_id in outputs.keys()
            if output_id not in specified_outputs
        ]
        if unused_outputs:
            raise ValueError(
                f"The test case provided outputs that are not specified in the interface: {unused_outputs}"
            )

        has_compared_output = any(
            outputs.get(output_spec.id) is not None for output_spec in interface.outputs
        )
        if not has_compared_output:
            raise ValueError("The test case did not compare any outputs.")

        for output_spec in interface.outputs:
            expected_output_data = outputs.get(output_spec.id)
            if expected_output_data is None:
                continue

            actual_output_path = context.output_path(output_spec.id)

            expected_output = expected_output_data.load()
            actual_output = pl.read_parquet(actual_output_path)
            compare_dfs(actual_output, expected_output)

test_secondary_analyzer(interface, main, *, primary_params=dict(), primary_outputs, dependency_outputs=dict(), expected_outputs)

Runs the secondary analyzer test.

Parameters:

Name Type Description Default
interface
AnalyzerInterface

The interface of the analyzer.

required
main
Callable[[SecondaryAnalyzerInterface], None]

The main function of the analyzer.

required
primary_params
dict[str, ParamValue]

(Optional) The primary analysis parameters.

dict()
primary_outputs
dict[str, TestData]

The primary output data, keyed by output ID.

required
dependency_outputs
dict[str, dict[str, TestData]]

The dependency output data, keyed by dependency ID and then by output ID.

dict()
expected_outputs
dict[str, TestData]

The expected output data, keyed by output ID.

required
Source code in testing/testers.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@pytest.mark.skip()
def test_secondary_analyzer(
    interface: AnalyzerInterface,
    main: Callable[[SecondaryAnalyzerContext], None],
    *,
    primary_params: dict[str, ParamValue] = dict(),
    primary_outputs: dict[str, TestData],
    dependency_outputs: dict[str, dict[str, TestData]] = dict(),
    expected_outputs: dict[str, TestData],
):
    """
    Runs the secondary analyzer test.

    Args:
        interface (AnalyzerInterface): The interface of the analyzer.
        main (Callable[[SecondaryAnalyzerInterface], None]): The main function of the analyzer.
        primary_params (dict[str, ParamValue]): (Optional) The primary analysis parameters.
        primary_outputs (dict[str, TestData]): The primary output data, keyed by output ID.
        dependency_outputs (dict[str, dict[str, TestData]]): The dependency output data, keyed by dependency ID and then by output ID.
        expected_outputs (dict[str, TestData]): The expected output data, keyed by output ID.
    """
    with ExitStack() as exit_stack:
        temp_dir = exit_stack.enter_context(TemporaryDirectory(delete=True))
        actual_output_dir = exit_stack.enter_context(TemporaryDirectory(delete=True))
        actual_base_output_dir = exit_stack.enter_context(
            TemporaryDirectory(delete=True)
        )
        actual_dependency_output_dirs = {
            dependency_id: exit_stack.enter_context(TemporaryDirectory(delete=True))
            for dependency_id in dependency_outputs.keys()
        }

        for output_id, output_data in primary_outputs.items():
            output_data.convert_to_parquet(
                os.path.join(actual_base_output_dir, f"{output_id}.parquet")
            )

        for dependency_id, dependency_output in dependency_outputs.items():
            for output_id, output_data in dependency_output.items():
                output_data.convert_to_parquet(
                    os.path.join(
                        actual_dependency_output_dirs[dependency_id],
                        f"{output_id}.parquet",
                    )
                )

        context = TestSecondaryAnalyzerContext(
            temp_dir=temp_dir,
            primary_param_values=primary_params,
            primary_output_parquet_paths={
                output_id: os.path.join(actual_base_output_dir, f"{output_id}.parquet")
                for output_id in primary_outputs.keys()
            },
            dependency_output_parquet_paths={
                dependency_id: {
                    output_id: os.path.join(
                        actual_dependency_output_dirs[dependency_id],
                        f"{output_id}.parquet",
                    )
                    for output_id in dependency_output.keys()
                }
                for dependency_id, dependency_output in dependency_outputs.items()
            },
            output_parquet_root_path=actual_output_dir,
        )
        main(context)

        specified_outputs = [output_spec.id for output_spec in interface.outputs]
        unused_outputs = [
            output_id
            for output_id in expected_outputs.keys()
            if output_id not in specified_outputs
        ]
        if unused_outputs:
            raise ValueError(
                f"The test case provided outputs that are not specified in the interface: {unused_outputs}"
            )

        has_compared_output = any(
            expected_outputs.get(output_spec.id) is not None
            for output_spec in interface.outputs
        )
        if not has_compared_output:
            raise ValueError("The test case did not compare any outputs.")

        for output_spec in interface.outputs:
            expected_output_data = expected_outputs.get(output_spec.id)
            if expected_output_data is None:
                continue

            actual_output_path = context.output_path(output_spec.id)

            expected_output = expected_output_data.load()
            actual_output = pl.read_parquet(actual_output_path)
            compare_dfs(actual_output, expected_output)