CSV: csv.reader, csv.DictReader, dialects, quoting
CSV (Comma-Separated Values) — самый распространённый text data exchange format. RFC 4180 формализует semantics, но в practice каждая система генерирует свою dialect (Excel, Unix, MySQL, custom). Stdlib csv module — single canonical parser/writer, который handles все edge cases (quoting, escapes, multi-line fields, dialects). Pragmatic-DEEP rule: никогда не пишите свой CSV parser — csv.reader covers RFC 4180 + de-facto dialects.
В этом уроке:
csv.reader/csv.DictReader— parsing API.csv.writer/csv.DictWriter— writing API.- Dialects —
csv.excel,csv.unix, custom registration. - Quoting modes — 4 constants table.
csv.Sniffer— automatic dialect detection.- Pitfalls 21 / 24 / 26 — single-string, dict-not-OrderedDict, quoting mismatch.
- Code-challenge
py-m09-02-code-1— Pattern 1 (CSV via io.StringIO + DictReader). - Cross-course → ClickHouse
FORMAT CSVclause. - Cross-course → Spark
spark.read.csvschema inference.
csv.reader — list-of-lists parsing
Базовый parser: yields list of strings per row.
import csv
import io
csv_data = "name,age,role\nalice,30,dev\nbob,25,qa\n"
buf = io.StringIO(csv_data)
reader = csv.reader(buf)
for row in reader:
print(row)
# ['name', 'age', 'role']
# ['alice', '30', 'dev']
# ['bob', '25', 'qa']
Все значения — str. CSV не имеет типов; конвертация (e.g., int(row[1])) — caller’s responsibility.
csv.reader принимает iterable of strings — file object, list of strings, io.StringIO. Не принимает single string (Pitfall 21 ниже). В М09 challenges мы всегда оборачиваем в io.StringIO(...).
Cite docs.python.org/3/library/csv.html#csv.reader.
csv.DictReader — list-of-dicts parsing
Использует первую row как header, возвращает dict per row:
import csv
import io
csv_data = "name,age,role\nalice,30,dev\nbob,25,qa\n"
buf = io.StringIO(csv_data)
reader = csv.DictReader(buf)
for row in reader:
print(row)
# {'name': 'alice', 'age': '30', 'role': 'dev'}
# {'name': 'bob', 'age': '25', 'role': 'qa'}
Pitfall 24 (M02 урок 03 carrying): в Python 3.8+ DictReader возвращает dict (insertion-order preserved per PEP 468), не OrderedDict. Pre-3.8 был OrderedDict. Pyodide ships Python 3.12+ → dict. Это важно для test assertions — repr({'a': 1}) ≠ repr(OrderedDict([('a', 1)])).
Custom fieldnames (когда header missing):
reader = csv.DictReader(
buf,
fieldnames=['name', 'age', 'role'], # ← caller provides
)
# Теперь первая row тоже data row
restkey / restval — handle row length mismatch:
restkey='extras'— extra columns в trailing row →row['extras']= list.restval='?'— missing columns в short row →row[col] = '?'.
Cite docs.python.org/3/library/csv.html#csv.DictReader.
csv.writer / csv.DictWriter
Symmetric — write list-of-lists / list-of-dicts:
import csv
import io
# csv.writer — list of lists
out = io.StringIO()
writer = csv.writer(out)
writer.writerow(['name', 'age', 'role']) # header
writer.writerows([
['alice', 30, 'dev'], # int автоконвертируется в str
['bob', 25, 'qa'],
])
print(out.getvalue())
# name,age,role
# alice,30,dev
# bob,25,qa
# csv.DictWriter — list of dicts
out = io.StringIO()
fieldnames = ['name', 'age', 'role']
writer = csv.DictWriter(out, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'name': 'alice', 'age': 30, 'role': 'dev'})
writer.writerow({'name': 'bob', 'age': 25, 'role': 'qa'})
Production rule: при записи на disk используйте newline='' (carrying урок 01) чтобы избежать platform-specific \r\n translation:
with open('out.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
...
Dialects — csv.excel, csv.unix, custom
Dialect = collection settings (delimiter, quotechar, lineterminator, quoting). Stdlib предоставляет 2:
| Dialect | delimiter | quotechar | lineterminator | quoting |
|---|---|---|---|---|
csv.excel (default) | ',' | '"' | '\r\n' | QUOTE_MINIMAL |
csv.unix | ',' | '"' | '\n' | QUOTE_ALL |
csv.excel_tab | '\t' | '"' | '\r\n' | QUOTE_MINIMAL (TSV) |
Pass dialect: csv.reader(buf, dialect='unix') или csv.reader(buf, csv.unix_dialect).
Custom dialect — csv.register_dialect:
import csv
csv.register_dialect(
'pipe',
delimiter='|',
quotechar='"',
lineterminator='\n',
quoting=csv.QUOTE_MINIMAL,
)
reader = csv.reader(buf, dialect='pipe')
Quoting modes — 4 constants
| Constant | Numeric | Behavior |
|---|---|---|
csv.QUOTE_MINIMAL (default) | 0 | Quote только fields содержащие delimiter, quotechar, или newline |
csv.QUOTE_ALL | 1 | Quote все fields (даже simple 'alice') |
csv.QUOTE_NONNUMERIC | 2 | Quote все non-numeric fields on write; auto-cast non-quoted в float on read |
csv.QUOTE_NONE | 3 | Никогда не quote; escape delimiter через escapechar |
Pitfall 26: QUOTE_NONE без escapechar raises csv.Error: need to escape, but no escapechar set если field contains ,. Use escapechar='\\' для TSV-like формата:
writer = csv.writer(out, quoting=csv.QUOTE_NONE, escapechar='\\')
writer.writerow(['hello, world', 'value'])
# hello\, world,value ← comma escaped с backslash
QUOTE_NONNUMERIC — useful когда вы знаете, что non-numeric fields — strings, numeric — float. csv.reader(buf, quoting=csv.QUOTE_NONNUMERIC) автоматически cast’ит unquoted fields в float (raises ValueError если не parseable).
buf = io.StringIO('"alice",30.0\n"bob",25.0\n')
reader = csv.reader(buf, quoting=csv.QUOTE_NONNUMERIC)
print(list(reader))
# [['alice', 30.0], ['bob', 25.0]] ← 30.0 — float, не str
Cite docs.python.org/3/library/csv.html#csv.QUOTE_MINIMAL.
csv.Sniffer — automatic dialect detection
Когда vendor file format unknown, csv.Sniffer infers delimiter / quotechar / has-header:
import csv
import io
sample = '''name|age|role
alice|30|dev
bob|25|qa
'''
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample)
print(dialect.delimiter) # '|'
has_header = sniffer.has_header(sample)
print(has_header) # True
buf = io.StringIO(sample)
reader = csv.reader(buf, dialect)
for row in reader:
print(row)
Pragmatic warning: Sniffer — heuristic, не infallible. Для production pipelines пинируйте dialect explicit (известно из vendor contract). Sniffer полезен для CLI tools / one-off ETL / interactive exploration.
Pitfall 21 — csv.reader requires iterable of strings
What goes wrong:
import csv
reader = csv.reader("alice,30\nbob,25\n") # ← single string
for row in reader:
print(row)
# Эффект — iterates **char by char**, не line by line
# ['a'], ['l'], ['i'], ...
Why: csv.reader treats input как iterable strings. Single string iterates как chars, не lines. Каждый char становится “row”.
How to avoid: всегда оборачивайте в io.StringIO("...") или используйте splitlines():
buf = io.StringIO("alice,30\nbob,25\n")
reader = csv.reader(buf)
# OR
reader = csv.reader("alice,30\nbob,25\n".splitlines())
В М09 challenges Pattern 1 enforces io.StringIO.
Code-challenge py-m09-02-code-1 — Pattern 1 setup
В quiz JSON 02-csv-stdlib.json встроен challenge:
Дана CSV строка с колонками
name,age,role. Используяio.StringIOиcsv.DictReader, верните список словарей гдеageсконвертирован вint.
Solution skeleton (revealed после submission):
import io
import csv
def solve(csv_str: str) -> list[dict]:
buf = io.StringIO(csv_str)
reader = csv.DictReader(buf)
rows = []
for row in reader:
row['age'] = int(row['age'])
rows.append(row)
return rows
Test cases (3 — 2 visible + 1 hidden):
- 3 строки данных (
alice/bob/carol). - Empty body — только header →
[]. - Hidden: проверка что
age—int(неstr).
Это canonical Pattern 1 — все М09 challenges повторяют ту же io.StringIO simulation.
Cross-course → ClickHouse FORMAT CSV
ClickHouse поддерживает 70+ ingestion formats; CSV — один из универсальных. Курс ClickHouse 11/07 — FORMAT clause covers 7 базовых форматов (CSV / TSV / JSONEachRow / JSON / Parquet / Native / Values) с performance tradeoffs.
ClickHouse INSERT INTO t FORMAT CSV имеет ту же parsing semantics что Python csv.reader — RFC 4180 базовая совместимость + dialect tuning через format_csv_delimiter / format_csv_allow_double_quotes settings. Difference — ClickHouse parsing distributed (replicas) и vectorized (16K-row blocks via Block structure), но conceptual model identical.
Bridge insight: same parsing logic, three execution layers — csv.DictReader (single-threaded Python) → Spark spark.read.csv (JVM distributed cluster) → ClickHouse FORMAT CSV (vectorized columnar). Recipe ниже остаётся stable; изменяется только runtime.
Cross-course → Spark spark.read.csv schema inference
Spark — distributed evolution того же model. Spark 03/01 — DataFrame creation + schema covers spark.read.csv("path") с inferSchema=True опцией.
# Spark equivalent (НЕ runs в Pyodide; Run-on-Your-Machine reference)
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = (
spark.read
.option('header', 'true')
.option('inferSchema', 'true') # type inference (костыль; production — pin schema)
.csv('s3://bucket/users.csv')
)
df.printSchema()
# root
# |-- name: string (nullable = true)
# |-- age: integer (nullable = true) ← inferred от sample
# |-- role: string (nullable = true)
Difference:
- Stdlib
csv.DictReader: всёstr, caller конвертирует. - Spark
inferSchema: автоматически infers types из sample (default 100 rows). Production — pin schema explicit (StructType) для consistency между runs.
Cross-course bridge: stdlib csv foundation → Spark csv reader = same RFC 4180 semantics, +schema inference, +distributed ingestion, +parquet output downstream.
Recipe — production CSV pipeline
End-to-end: read CSV → validate → typed records → write filtered subset.
import io
import csv
from dataclasses import dataclass
@dataclass
class User:
name: str
age: int
role: str
def parse_users(csv_str: str) -> list[User]:
"""Parse CSV → typed User records. Skip invalid rows."""
buf = io.StringIO(csv_str)
reader = csv.DictReader(buf)
out = []
for row in reader:
try:
out.append(User(
name=row['name'],
age=int(row['age']),
role=row['role'],
))
except (ValueError, KeyError):
continue # skip malformed rows
return out
def write_filtered(users: list[User], min_age: int) -> str:
"""Write filtered subset как CSV string."""
out = io.StringIO()
writer = csv.DictWriter(out, fieldnames=['name', 'age', 'role'])
writer.writeheader()
for u in users:
if u.age >= min_age:
writer.writerow({'name': u.name, 'age': u.age, 'role': u.role})
return out.getvalue()
# Usage
csv_in = "name,age,role\nalice,30,dev\nbob,25,qa\ncarol,abc,pm\n"
users = parse_users(csv_in)
print([u.name for u in users]) # ['alice', 'bob'] ← carol skipped (invalid age)
csv_out = write_filtered(users, min_age=27)
print(csv_out)
# name,age,role
# alice,30,dev
Что в следующем уроке
Урок 03 — JSON stdlib API (json.loads / json.dumps, JSONL streaming, custom encoders). Code-challenge py-m09-03-code-1 — Pattern 2 (JSON nested traversal). Cross-course → Spark spark.read.json + ClickHouse JSONEachRow. Same io.StringIO simulation pattern, разные value types (Python dict/list/int/float ↔ JSON object/array/number).
Pragmatic-DEEP принцип: не deep-dive’ем
_csvC-extension internals. Stdlibcsv.reader— battle-tested, production-grade; learn API, не CPython source. Если нужна performance >1M rows/sec — переходите на pyarrow / polars (урок M10), не on hand-rolled parser.