Skip to content

Importing

importing

Modules:

Name Description
csv
importer

Classes

Modules

csv

Classes:

Name Description
CSVImporter
Classes
CSVImporter

Bases: Importer['CsvImportSession']

Source code in importing/csv.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
class CSVImporter(Importer["CsvImportSession"]):
    @property
    def name(self) -> str:
        return "CSV"

    def suggest(self, input_path: str) -> bool:
        return input_path.endswith(".csv")

    def _detect_skip_rows_and_dialect(self, input_path: str) -> tuple[int, csv.Dialect]:
        """Detect the number of rows to skip before CSV data begins and the CSV dialect."""
        skip_rows = 0

        try:
            with open(input_path, "r", encoding="utf8") as file:

                MAX_LINES = 50  # check the first 50 lines only
                lines = [line.strip() for i, line in enumerate(file) if i <= MAX_LINES]
                total_lines = len(lines)

                # Only analyze if we have enough lines
                if len(lines) >= 2:
                    # Parse each line and analyze content, keeping track of original line numbers
                    parsed_rows = []
                    line_numbers = []
                    for line_idx, line in enumerate(lines):
                        if not line:  # Skip empty lines
                            continue
                        try:
                            reader = csv.reader([line])
                            row = next(reader)
                            parsed_rows.append(row)
                            line_numbers.append(line_idx)
                        except Exception:
                            parsed_rows.append([line])  # Fallback for problematic lines
                            line_numbers.append(line_idx)

                    if len(parsed_rows) >= 2:
                        # Look for the actual CSV header (column names)
                        for i, row in enumerate(parsed_rows):
                            if self._looks_like_csv_header(row):
                                skip_rows = line_numbers[i]
                                break
                        else:
                            # Fallback: use field count analysis
                            field_counts = [len(row) for row in parsed_rows]
                            from collections import Counter

                            count_frequency = Counter(field_counts)
                            most_common_count = count_frequency.most_common(1)[0][0]

                            # Find first row that matches the most common field count
                            for i, count in enumerate(field_counts):
                                if count == most_common_count:
                                    skip_rows = line_numbers[i]
                                    break

                # Validate skip_rows doesn't exceed available lines
                if skip_rows >= total_lines:
                    skip_rows = 0  # Reset to safe default

                # Now detect dialect from the CSV content (after skip_rows)
                file.seek(0)
                for _ in range(skip_rows):
                    file.readline()

                sample = file.read(65536)
                dialect = Sniffer().sniff(sample)

        except Exception:
            # If anything fails, use defaults and try basic dialect detection
            skip_rows = 0
            try:
                with open(input_path, "r", encoding="utf8") as file:
                    sample = file.read(65536)
                    dialect = Sniffer().sniff(sample)
            except Exception:
                # Create a default dialect if everything fails
                class DefaultDialect:
                    delimiter = ","
                    quotechar = '"'

                dialect = DefaultDialect()

        return skip_rows, dialect

    def _looks_like_csv_header(self, row: list[str]) -> bool:
        """Check if a row looks like a CSV header with column names."""
        if not row or len(row) < 2:
            return False

        # Skip rows where most fields are empty (likely CSV notes with trailing commas)
        non_empty_fields = [field.strip() for field in row if field.strip()]
        if len(non_empty_fields) < len(row) // 2:
            return False

        # Look for typical CSV header characteristics
        header_indicators = 0

        for field in non_empty_fields:
            field = field.lower().strip()

            # Common column name patterns
            if any(
                word in field
                for word in [
                    "id",
                    "name",
                    "date",
                    "time",
                    "user",
                    "tweet",
                    "text",
                    "count",
                    "number",
                    "sent",
                    "screen",
                    "retweeted",
                    "favorited",
                ]
            ):
                header_indicators += 1

            # Short descriptive column names (not long sentences like CSV notes)
            if 3 <= len(field) <= 30 and not field.startswith(
                ("http", "www", "from ", "if you")
            ):
                header_indicators += 1

        # Consider it a CSV header if at least 50% of non-empty fields look like column names
        return header_indicators >= len(non_empty_fields) * 0.5

    def init_session(self, input_path: str):
        skip_rows, dialect = self._detect_skip_rows_and_dialect(input_path)

        return CsvImportSession(
            input_file=input_path,
            separator=dialect.delimiter,
            quote_char=dialect.quotechar,
            has_header=True,
            skip_rows=skip_rows,
        )

    def manual_init_session(self, input_path: str):
        separator = self._separator_option(None)
        if separator is None:
            return None

        quote_char = self._quote_char_option(None)
        if quote_char is None:
            return None

        has_header = self._header_option(None)
        if has_header is None:
            return None

        skip_rows = self._skip_rows_option(None)
        if skip_rows is None:
            return None

        return CsvImportSession(
            input_file=input_path,
            separator=separator,
            quote_char=quote_char,
            has_header=has_header,
            skip_rows=skip_rows,
        )

    def modify_session(
        self,
        input_path: str,
        import_session: "CsvImportSession",
        reset_screen: Callable[[], None],
    ):
        is_first_time = True
        while True:
            reset_screen(import_session)
            action = prompts.list_input(
                "What would you like to change?",
                choices=[
                    ("Column separator", "separator"),
                    ("Quote character", "quote_char"),
                    ("Header", "header"),
                    ("Skip rows", "skip_rows"),
                    ("Done. Use these options.", "done"),
                ],
                default=None if is_first_time else "done",
            )
            is_first_time = False
            if action is None:
                return None

            if action == "done":
                return import_session

            if action == "separator":
                separator = self._separator_option(import_session.separator)
                if separator is None:
                    continue
                import_session.separator = separator

            if action == "quote_char":
                quote_char = self._quote_char_option(import_session.quote_char)
                if quote_char is None:
                    continue
                import_session.quote_char = quote_char

            if action == "header":
                has_header = self._header_option(import_session.has_header)
                if has_header is None:
                    continue
                import_session.has_header = has_header

            if action == "skip_rows":
                skip_rows = self._skip_rows_option(import_session.skip_rows)
                if skip_rows is None:
                    continue
                import_session.skip_rows = skip_rows

    @staticmethod
    def _separator_option(previous_value: Optional[str]) -> Optional[str]:
        input: Optional[str] = prompts.list_input(
            "Select the column separator",
            choices=[
                ("comma (,)", ","),
                ("semicolon (;)", ";"),
                ("Pipe (|)", "|"),
                ("Tab", "\t"),
                ("Other", "other"),
            ],
            default=(
                previous_value
                if previous_value in [",", ";", "\t"]
                else "other" if previous_value is not None else None
            ),
        )
        if input is None:
            return None
        if input != "other":
            return input

        input = prompts.text("Enter the separator")
        if input is None:
            return None
        input = input.strip()
        if len(input) == 0:
            return None

    @staticmethod
    def _quote_char_option(previous_value: Optional[str]) -> Optional[str]:
        input: Optional[str] = prompts.list_input(
            "Select the quote character",
            choices=[
                ('Double quote (")', '"'),
                ("Single quote (')", "'"),
                ("Other", "other"),
            ],
            default=(
                previous_value
                if previous_value in ['"', "'"]
                else "other" if previous_value is not None else None
            ),
        )
        if input is None:
            return None
        if input != "other":
            return input

        input = prompts.text("Enter the quote character")
        if input is None:
            return None
        input = input.strip()
        if len(input) == 0:
            return None

    def _header_option(self, previous_value: Optional[bool]) -> Optional[bool]:
        return prompts.list_input(
            "Does the file have a header?",
            choices=[
                ("Yes", True),
                ("No", False),
            ],
            default=previous_value,
        )

    @staticmethod
    def _skip_rows_option(previous_value: Optional[int]) -> Optional[int]:
        while True:
            input_str = prompts.text(
                f"Number of rows to skip at the beginning of file (current: {previous_value or 0}).",
                default=str(previous_value) if previous_value is not None else "0",
            )
            if input_str is None:  # User cancelled
                return None

            try:
                skip_rows = int(input_str.strip())
                if skip_rows < 0:
                    print_message(
                        "Skip rows cannot be negative. Please try again.", "error"
                    )
                    continue
                if skip_rows > 10:
                    confirm = prompts.confirm(
                        f"Skip {skip_rows} rows? This seems high. Continue?",
                        default=True,
                    )
                    if not confirm:
                        continue  # Ask for input again instead of returning None
                return skip_rows
            except ValueError:
                print_message("Please enter a valid number.", "error")
                continue

importer

Classes:

Name Description
Importer
ImporterSession

The ImporterSession interface handles the ongoing configuration of an import.

Classes
Importer

Bases: ABC

Methods:

Name Description
init_session

Produces an initial import session object that contains all the configuration

modify_session

Performs the interactive UI sequence that customizes the import session

suggest

Check if the importer can handle the given file. This should be fairly

Attributes:

Name Type Description
name str

The name of the importer. It will be quoted in the UI in texts such as

Source code in importing/importer.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class Importer[SessionType](ABC):
    @property
    @abstractmethod
    def name(self) -> str:
        """
        The name of the importer. It will be quoted in the UI in texts such as
        "Imported as `name`, so keep it to a format name."
        """
        pass

    @abstractmethod
    def suggest(self, input_path: str) -> bool:
        """
        Check if the importer can handle the given file. This should be fairly
        restrictive based on reasonable assumptions, as it is only used for the
        initial importer suggestion. The user can always override the suggestion.
        """
        pass

    @abstractmethod
    def init_session(self, input_path: str) -> Optional[SessionType]:
        """
        Produces an initial import session object that contains all the configuration
        needed for the import. The user can either accept this configuration or
        customize it.

        Return None here if the importer cannot figure out how to configure the
        import parameters. This doesn't necessarily mean that the file cannot be
        loaded; the UI will force the user to customize the import session if the
        user wants to proceed with this importer.
        """
        pass

    @abstractmethod
    def manual_init_session(self, input_path: str) -> Optional[SessionType]:
        pass

    @abstractmethod
    def modify_session(
        self,
        input_path: str,
        import_session: SessionType,
        reset_screen: Callable[[SessionType], None],
    ) -> Optional[SessionType]:
        """
        Performs the interactive UI sequence that customizes the import session
        from the initial one.

        Return None here if the user interrupts the customization process.
        """
        pass
Attributes
name abstractmethod property

The name of the importer. It will be quoted in the UI in texts such as "Imported as name, so keep it to a format name."

Functions
init_session(input_path) abstractmethod

Produces an initial import session object that contains all the configuration needed for the import. The user can either accept this configuration or customize it.

Return None here if the importer cannot figure out how to configure the import parameters. This doesn't necessarily mean that the file cannot be loaded; the UI will force the user to customize the import session if the user wants to proceed with this importer.

Source code in importing/importer.py
61
62
63
64
65
66
67
68
69
70
71
72
73
@abstractmethod
def init_session(self, input_path: str) -> Optional[SessionType]:
    """
    Produces an initial import session object that contains all the configuration
    needed for the import. The user can either accept this configuration or
    customize it.

    Return None here if the importer cannot figure out how to configure the
    import parameters. This doesn't necessarily mean that the file cannot be
    loaded; the UI will force the user to customize the import session if the
    user wants to proceed with this importer.
    """
    pass
modify_session(input_path, import_session, reset_screen) abstractmethod

Performs the interactive UI sequence that customizes the import session from the initial one.

Return None here if the user interrupts the customization process.

Source code in importing/importer.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
def modify_session(
    self,
    input_path: str,
    import_session: SessionType,
    reset_screen: Callable[[SessionType], None],
) -> Optional[SessionType]:
    """
    Performs the interactive UI sequence that customizes the import session
    from the initial one.

    Return None here if the user interrupts the customization process.
    """
    pass
suggest(input_path) abstractmethod

Check if the importer can handle the given file. This should be fairly restrictive based on reasonable assumptions, as it is only used for the initial importer suggestion. The user can always override the suggestion.

Source code in importing/importer.py
52
53
54
55
56
57
58
59
@abstractmethod
def suggest(self, input_path: str) -> bool:
    """
    Check if the importer can handle the given file. This should be fairly
    restrictive based on reasonable assumptions, as it is only used for the
    initial importer suggestion. The user can always override the suggestion.
    """
    pass
ImporterSession

Bases: ABC

The ImporterSession interface handles the ongoing configuration of an import. It keeps the configuration state, knows how to print the configuration to the console, and can load a preview of the data from the input file.

Methods:

Name Description
import_as_parquet

Import the data from the input file to the output file in the Parquet format.

load_preview

Attempt to load a preview of the data from the input file.

print_config

Print the configuration of the import session to the console.

Source code in importing/importer.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ImporterSession(ABC):
    """
    The ImporterSession interface handles the ongoing configuration of an import.
    It keeps the configuration state, knows how to print the configuration to the
    console, and can load a preview of the data from the input file.
    """

    @abstractmethod
    def print_config(self) -> None:
        """
        Print the configuration of the import session to the console.
        """
        pass

    @abstractmethod
    def load_preview(self, n_records: int) -> Optional[pl.DataFrame]:
        """
        Attempt to load a preview of the data from the input file.

        Return None here if it is sure that the file cannot be loaded with the current
        configuration. Only throw an execption in the case of unexpected errors.
        """
        pass

    @abstractmethod
    def import_as_parquet(self, output_path: str) -> None:
        """
        Import the data from the input file to the output file in the Parquet format.
        """
        pass
Functions
import_as_parquet(output_path) abstractmethod

Import the data from the input file to the output file in the Parquet format.

Source code in importing/importer.py
31
32
33
34
35
36
@abstractmethod
def import_as_parquet(self, output_path: str) -> None:
    """
    Import the data from the input file to the output file in the Parquet format.
    """
    pass
load_preview(n_records) abstractmethod

Attempt to load a preview of the data from the input file.

Return None here if it is sure that the file cannot be loaded with the current configuration. Only throw an execption in the case of unexpected errors.

Source code in importing/importer.py
21
22
23
24
25
26
27
28
29
@abstractmethod
def load_preview(self, n_records: int) -> Optional[pl.DataFrame]:
    """
    Attempt to load a preview of the data from the input file.

    Return None here if it is sure that the file cannot be loaded with the current
    configuration. Only throw an execption in the case of unexpected errors.
    """
    pass
print_config() abstractmethod

Print the configuration of the import session to the console.

Source code in importing/importer.py
14
15
16
17
18
19
@abstractmethod
def print_config(self) -> None:
    """
    Print the configuration of the import session to the console.
    """
    pass