Learning Platform
Урок 09.03 · 24 мин
Начальный
pandasdatetimeresamplerolling windowtime-series
SQL: TIMESTAMP, TIMESTAMPTZ, часовые пояса в PostgreSQL Batch-обработка: окна, расписание и идемпотентность

Половина работы DE — это даты

Любой реальный датасет в работе DE содержит даты. Время заказа, время визита, дата отчёта, окно SLA. Дальше — стандартные операции: «выручка по дням», «недельный rolling-average», «процент от прошлой недели». Это всё про даты и временные окна.

pandas умеет работать с датами очень хорошо — это была одна из причин её популярности с 2010-х. В этом уроке — нужный минимум: парсинг, dt-аксессор, ресэмплинг, rolling, сравнения через shift/diff. Без этого DE-скриптов не пишут.

Парсинг дат

В прошлых уроках мы видели parse_dates=["col"] в read_csv и pd.to_datetime(series). Разберёмся глубже.

import pandas as pd

# из строк ISO 8601 — pandas всё делает сам
s = pd.to_datetime(["2025-01-15", "2025-02-20", "2025-03-10"])
print(s)
# DatetimeIndex(['2025-01-15', '2025-02-20', '2025-03-10'], dtype='datetime64[ns]')

# с нестандартным форматом — лучше указать явно (быстрее, надёжнее)
s = pd.to_datetime(["15/01/2025", "20/02/2025"], format="%d/%m/%Y")

# с временем и таймзоной
s = pd.to_datetime(["2025-01-15T10:00:00+03:00"], utc=True)
# конвертирует в UTC: 2025-01-15 07:00:00+00:00
TIP

Всегда указывайте utc=True, если в данных есть timezone. Хранить все timestamps в UTC, конвертировать в локальное время только при выводе пользователю — единственный способ не сойти с ума с летним временем и разными зонами.

format — это

strftime-синтаксис
. Базовые директивы: %Y (год 4 знака), %m (месяц), %d (день), %H:%M:%S (часы:минуты:секунды). Указание format ускоряет парсинг — pandas не угадывает.

При грязных данных:

s = pd.to_datetime(["2025-01-15", "BADDATE", "2025-03-10"], errors="coerce")
# Index 1 будет NaT (not-a-time, аналог NaN для дат)

dt accessor — лезем в детали

Когда колонка имеет тип datetime64, у Series появляется специальный

dt accessor
:

df = pd.DataFrame({
    "event_time": pd.to_datetime([
        "2025-01-15 10:30:00",
        "2025-02-20 14:45:00",
        "2025-03-10 09:15:00",
    ]),
})

df["year"]       = df["event_time"].dt.year       # 2025
df["month"]      = df["event_time"].dt.month      # 1, 2, 3
df["day"]        = df["event_time"].dt.day        # 15, 20, 10
df["dayofweek"]  = df["event_time"].dt.dayofweek  # 2, 3, 0 (Monday=0)
df["hour"]       = df["event_time"].dt.hour       # 10, 14, 9
df["date_only"]  = df["event_time"].dt.date       # date без времени
df["floor_day"]  = df["event_time"].dt.floor("D") # обнуляет время до 00:00
df["floor_h"]    = df["event_time"].dt.floor("h") # до начала часа
df["floor_15m"]  = df["event_time"].dt.floor("15min")  # до 15-мин интервала

dt.floor — частая операция для агрегации: «приведи timestamp к началу суток» или «округли до 5-минутного бакета».

Полезные булевы:

df["event_time"].dt.is_month_end
df["event_time"].dt.is_quarter_start
df["event_time"].dt.weekday < 5   # будний день?

date_range — генерация дат

Нужно сгенерировать список дат для бекфилла? Календарь?

# каждый день января 2025
pd.date_range("2025-01-01", "2025-01-31", freq="D")

# каждый понедельник
pd.date_range("2025-01-01", "2025-04-01", freq="W-MON")

# каждый месяц (последний день месяца)
pd.date_range("2025-01-01", "2025-12-31", freq="ME")

# каждые 15 минут
pd.date_range("2025-01-01", periods=96, freq="15min")

freq
понимает много обозначений: D, W, MS/ME, Q, h, min, s, и составные 2h, 15min, 7D.

Польза: набор дат-меток для merge со «всем календарём» — чтобы заполнить дыры в данных днями без активности.

resample — group by времени

Это главная операция для time-series в DE. Идея та же, что groupby, но группа — это временное окно.

# event-stream с timestamps
events = pd.DataFrame({
    "event_time": pd.to_datetime([
        "2025-01-15 10:00:00",
        "2025-01-15 10:30:00",
        "2025-01-15 14:00:00",
        "2025-01-16 09:00:00",
        "2025-01-16 11:00:00",
    ]),
    "user_id": [1, 2, 1, 3, 1],
    "amount":  [100, 200, 50, 300, 150],
})

# нужен datetime-index для resample
events = events.set_index("event_time")

# daily aggregate
events.resample("D").agg(
    total_amount=("amount", "sum"),
    n_events=("amount", "size"),
    unique_users=("user_id", "nunique"),
)
#             total_amount  n_events  unique_users
# event_time
# 2025-01-15           350         3             2
# 2025-01-16           450         2             2

# hourly aggregate
events.resample("h").agg({"amount": "sum"})

# 15-минутные окна
events.resample("15min").agg({"amount": "sum"})

# weekly с конца недели в воскресенье
events.resample("W-SUN").agg({"amount": "sum"})

resample требует datetime в индексе. Если данные с datetime-колонкой, делайте set_index("event_time") перед resample. После — reset_index() если нужно обратно.

Пустые окна (часы без событий) автоматически появляются в результате со значением 0 или NaN — в зависимости от агрегата. Это удобно для построения непрерывного time-series.

resample('D') на event-stream

Каждое событие падает в бакет своего дня. Агрегат внутри бакета.

event_timeamount
01-15 10:00100
01-15 10:30200
01-15 14:0050
01-16 09:00300
01-16 11:00150
resample('D')
daytotal_amount
2025-01-15350 (3 events)
2025-01-16450 (2 events)

resample с заполнением пустых окон

# upsample — добавление промежуточных точек
daily = events.resample("D").agg({"amount": "sum"})

# если в каком-то дне ничего не было — будет 0
# (для sum это default; для других — NaN, тогда fillna(0))

# downsample-fill при разреженности
daily = events["amount"].resample("h").sum().fillna(0)

rolling window — скользящие агрегаты

«Семидневный moving average выручки» — это

rolling window
:

daily = events.resample("D").agg({"amount": "sum"})

# 7-day moving average
daily["ma7"] = daily["amount"].rolling(window=7, min_periods=1).mean()

# 30-day sum
daily["ma30_sum"] = daily["amount"].rolling(window=30, min_periods=1).sum()

# rolling с временным окном (не количество точек, а интервал)
daily["last_3d_avg"] = daily["amount"].rolling("3D").mean()

window=7 — фиксированное число точек. "3D" — временной интервал; требует datetime-index. min_periods=1 — какое минимальное число точек нужно, чтобы вернуть значение (без него первые 6 будут NaN).

Польза для DE-отчётов: сглаживание шума, тренды, anomaly detection («сегодняшняя выручка отклонилась от 7-day moving average на > 2σ»).

shift и diff — сравнения периодов

shift(n) — сдвиг Series на n позиций. diff(n) — разность с n-й предыдущей точкой.

daily = events.resample("D").agg({"amount": "sum"})

# выручка вчера
daily["amount_yesterday"] = daily["amount"].shift(1)

# изменение день-к-дню
daily["dod_change"] = daily["amount"].diff(1)

# изменение неделя-к-неделе
daily["wow_change"] = daily["amount"].diff(7)

# процентное изменение
daily["dod_pct"] = daily["amount"].pct_change(1) * 100

pct_change(n) = (x_now - x_n_before) / x_n_before. Удобно для отчётов «выручка на X% выше прошлой недели».

DE-кейс: WoW отчёт из event-stream

Соберём всё вместе. Дано: лог событий с timestamps и amount. Нужно: ежедневный сводный отчёт с WoW-сравнением и 7d moving average.

import pandas as pd

# 1. Читаем сырой event-stream
events = pd.read_parquet("events.parquet")  # колонки: event_time, user_id, amount

# 2. Парсим время в UTC, ставим индексом
events["event_time"] = pd.to_datetime(events["event_time"], utc=True)
events = events.set_index("event_time")

# 3. Daily aggregate
daily = events.resample("D").agg(
    revenue=("amount", "sum"),
    n_events=("amount", "size"),
    n_users=("user_id", "nunique"),
)

# 4. Заполняем дни без событий нулями
daily["revenue"] = daily["revenue"].fillna(0)
daily["n_events"] = daily["n_events"].fillna(0).astype("int64")
daily["n_users"] = daily["n_users"].fillna(0).astype("int64")

# 5. Сравнения
daily["revenue_prev_week"] = daily["revenue"].shift(7)
daily["wow_pct"] = daily["revenue"].pct_change(7) * 100

# 6. 7-day moving average
daily["revenue_ma7"] = daily["revenue"].rolling(7, min_periods=1).mean()

# 7. Отбираем последний месяц
last_month = daily.last("30D")

Это типичный pipeline для report-aggregation, который в любой компании DE пишет на третий день работы. Шесть строк pandas против ~50 строк голого Python.

Подводные камни с таймзонами

Один из самых частых багов начинающих — путаница naive/aware datetime.

# naive — без таймзоны
pd.to_datetime("2025-01-15 10:00:00")   # Timestamp('2025-01-15 10:00:00')

# aware — с таймзоной
pd.to_datetime("2025-01-15 10:00:00+03:00")   # с таймзоной UTC+3
pd.to_datetime("2025-01-15 10:00:00", utc=True)   # с явной UTC

Нельзя сравнивать или вычитать naive с aware — pandas упадёт с Cannot compare tz-naive and tz-aware. Лечится одним из двух:

# вариант 1: оба naive
df["t"] = df["t"].dt.tz_localize(None)

# вариант 2: оба aware в UTC
df["t"] = df["t"].dt.tz_localize("UTC")   # если был naive
df["t"] = df["t"].dt.tz_convert("UTC")    # если был с другой зоной

Правильная практика в ETL: при чтении сырых данных сразу tz_localize / tz_convert в UTC. Всё остальное — UTC. Только при выводе в отчёт пользователю переводите в локальную зону.

Праздники и календарь рабочих дней

Иногда нужно «следующий рабочий день» или «дни между двумя датами, исключая выходные».

# рабочие дни между двумя датами
pd.bdate_range("2025-01-01", "2025-01-14")
# DatetimeIndex(['2025-01-01', '2025-01-02', '2025-01-03', '2025-01-06', ...])
# (исключает субботу-воскресенье; праздники нужно задавать отдельно)

# через offsets — добавить 5 рабочих дней
from pandas.tseries.offsets import BDay
pd.Timestamp("2025-01-15") + BDay(5)

Для конкретных нац. праздников — библиотека holidays (extra dependency), а внутри pandas — CustomBusinessDay.

Шпаргалка datetime

Что нужноКоманда
Парсить строкиpd.to_datetime(series, utc=True)
Сгенерить датыpd.date_range(start, end, freq="D")
Год/месяц/часs.dt.year, s.dt.month, s.dt.hour
День неделиs.dt.dayofweek (Monday=0)
Округление до бакетаs.dt.floor("h") / "D" / "15min"
Группа по днюdf.resample("D").agg(...) (index — datetime)
Moving averages.rolling(7).mean()
WoW изменениеs.pct_change(7)
Прошлая точкаs.shift(1)
Сменить таймзонуs.dt.tz_convert("UTC")

Упражнение

Дано — event-stream events.csv:

event_time,user_id,amount
2025-01-01 09:30:00+00:00,1,100
2025-01-01 10:00:00+00:00,2,200
2025-01-02 08:15:00+00:00,1,50
2025-01-02 18:30:00+00:00,3,300
2025-01-08 11:00:00+00:00,1,150
2025-01-08 12:00:00+00:00,2,250

Напишите функцию daily_report(path) -> pd.DataFrame, которая возвращает таблицу с колонками:

  • revenue — сумма amount за день
  • n_events — число событий за день
  • revenue_ma3 — 3-дневный moving average (с min_periods=1)
  • revenue_wow — выручка неделя назад (shift(7))
  • wow_pctpct_change(7) * 100

Критерии приёмки:

  • event_time парсится с utc=True.
  • Используется resample("D") после set_index("event_time").
  • В дни без событий revenue=0, n_events=0.
  • Индекс результата — DatetimeIndex от 2025-01-01 до 2025-01-08 без пропусков (8 дней).

В следующем уроке — производительность, чтение Parquet/SQL, chunking больших файлов, и когда стоит смотреть в сторону polars.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4