Learning Platform
Глоссарий Troubleshooting
Урок 08.03 · 26 мин
Продвинутый
Dynamic Mappingexpand_kwargszippartialXComArg

expand_kwargs, zip, partial — продвинутые формы expansion

Базовый .expand(arg=iterable) (урок 02) даёт cartesian product, что не всегда нужно. В реальных production DAG чаще нужны:

  • Explicit pairs — конкретные комбинации, не cross-multiplication
  • Parallel zip — lockstep pairing двух lists
  • Constants + dynamic — fixed arguments плюс одна меняющаяся ось

Airflow предоставляет три формы: .expand_kwargs(), XComArg.zip(), и .partial().expand(). В этом уроке препарируем каждую с production примерами.


.expand_kwargs() — list of dicts

.expand_kwargs(list_of_dicts) принимает явный список dict-ов. Каждый dict — это kwargs для одного mapped TI.

@task
def process(env: str, region: str, replicas: int):
    print(f"{env}/{region}{replicas} replicas")

process.expand_kwargs([
    {"env": "dev",  "region": "us", "replicas": 1},
    {"env": "prod", "region": "us", "replicas": 5},
    {"env": "prod", "region": "eu", "replicas": 3},
])

Результат: 3 mapped TI (не 2×2×3=12 как было бы у cartesian expand):

map_indexenvregionreplicas
0devus1
1produs5
2prodeu3

Use case: у вас уже есть готовые комбинации (из конфига, DB, или manual selection), не нужен cross-product.

От upstream XCom

.expand_kwargs(list) принимает также XComArg:

@task
def get_configs() -> list[dict]:
    return [
        {"env": "dev", "region": "us"},
        {"env": "prod", "region": "eu"},
    ]

@task
def deploy(env: str, region: str):
    print(f"Deploying {env}/{region}")

deploy.expand_kwargs(get_configs())

Scheduler reads XCom от get_configs, видит list of 2 dicts → creates 2 mapped TI. Каждая получает kwargs из соответствующего dict.

Validation на parse

Airflow inspects callable signature. Если в dict есть keys, которые не являются parameters callable — error на parse (Airflow 2.5+):

@task
def process(env: str):
    pass

# ❌ Error: 'invalid_key' is not a parameter of process
process.expand_kwargs([{"env": "dev", "invalid_key": "value"}])

XComArg.zip() — parallel pairs

Когда у вас две parallel lists same length и нужны pairs (a[0]+b[0], a[1]+b[1]…):

from airflow.models.xcom_arg import XComArg

@task
def get_users() -> list[str]:
    return ["alice", "bob", "carol"]

@task
def get_scores() -> list[int]:
    return [90, 85, 92]

@task
def report(user: str, score: int):
    print(f"{user}: {score}")

users = get_users()
scores = get_scores()

# Pair через zip + expand_kwargs:
report.expand_kwargs(
    users.zip(scores).map(lambda pair: {"user": pair[0], "score": pair[1]})
)

Что произошло:

  • users.zip(scores)ZipXComArg lazy reference на [(alice, 90), (bob, 85), (carol, 92)]
  • .map(lambda pair: {...}) — lazy transformation, превращает tuple в dict
  • expand_kwargs(...) — 3 mapped TI с правильными kwargs

zip с разной length

users.zip(scores) ведёт себя как Python zip() — обрезает по shortest. Если users 3 элемента, scores 2 — будет 2 pairs.

users.zip(scores, fillvalue=None) — pad shortest (как itertools.zip_longest).


.partial().expand() — constants + dynamic

Когда часть аргументов constants, часть dynamic — используйте .partial() для constants, .expand() для dynamic:

@task
def upload_to_s3(bucket: str, region: str, filename: str):
    print(f"Upload {filename} → s3://{bucket} ({region})")

@task
def get_files() -> list[str]:
    return ["a.csv", "b.csv", "c.csv"]

# bucket и region — constants для всех 3 mapped TI
# filename — dynamic, разный для каждой
upload_to_s3.partial(
    bucket="my-bucket"
    region="us-east-1",
).expand(
    filename=get_files(),
)

3 mapped TI:

map_indexbucketregionfilename
0my-bucketus-east-1a.csv
1my-bucketus-east-1b.csv
2my-bucketus-east-1c.csv

Это не делает копию constants в XCom — они хранятся ОДИН раз в MappedOperator metadata, что эффективнее чем cartesian с 1-element list.

partial + expand_kwargs

Можно combine:

upload_to_s3.partial(
    bucket="my-bucket",
).expand_kwargs(
    [
        {"region": "us-east-1", "filename": "a.csv"},
        {"region": "eu-west-1", "filename": "b.csv"},
    ]
)

bucket константа, region+filename varying.


Combined: zip + map + partial

Real-world pattern:

@task
def get_users() -> list[str]:
    return ["alice", "bob", "carol"]

@task
def get_emails() -> list[str]:
    return ["[email protected]", "[email protected]", "[email protected]"]

@task
def send_notification(
    smtp_host: str,           # constant
    from_addr: str,           # constant
    name: str,                # dynamic — per map_index
    email: str,               # dynamic — per map_index
):
    print(f"Send to {name} <{email}>")

names = get_users()
emails = get_emails()

send_notification.partial(
    smtp_host="smtp.gmail.com"
    from_addr="[email protected]",
).expand_kwargs(
    names.zip(emails).map(lambda pair: {"name": pair[0], "email": pair[1]})
)

3 mapped TI, каждая с правильными name+email pair и общими smtp_host/from_addr.


Visualization сравнения

Формы expansion в TaskFlow
expand(arg=list)Один аргумент, iterable. Mapped по element-by-element. N mapped TI где N=len(list). Base case.
expand(a=L1, b=L2)Несколько args → cartesian product itertools.product. N×M mapped TI. map_index по lexicographic order (left-most меняется slowest). Быстро становится huge.
expand_kwargs([{...}, ...])Explicit list of dicts. Length=len(list). Pairs не cross-multiplied — точно те комбинации, что в list. Используется когда комбинации pre-computed.
users.zip(emails) + map + expand_kwargsParallel zip — pairs same index. Length=min(len(a),len(b)). Используется когда два list same length corresponding elements.
partial(const=...).expand(dynamic=...)partial — constant args для всех mapped TI (хранятся одиножды в MappedOperator metadata). expand — dynamic. Combine constants + dynamic — эффективнее cartesian с 1-element list.

Comparison table

MethodInputOutput countUse case
.expand(a=L)one iterableN=len(L)Base case, simple
.expand(a=L1, b=L2)multiple iterablesN×M cartesianAll combinations
.expand_kwargs(list)list of dictsN=len(list)Pre-computed combos
.zip(a, b)two iterablesmin(len) pairsLockstep parallel
.partial(c=v).expand(...)constants + dynamicN=len(dynamic)Mixed const/dynamic

XComArg.map() vs @task

.map(fn) — lazy transformation на XComArg level. Применяется при resolve, не создаёт extra TI.

items = get_items()                    # XComArg
doubled = items.map(lambda x: x * 2)   # lazy MapXComArgWithMap — НЕ TI

@task
def use(v): pass

use.expand(v=doubled)

Преимущества:

  • Не создаёт extra task → не добавляет scheduling overhead
  • Не пишет XCom для transformation result

Caveats:

  • Lambda должна быть picklable (no closures с local vars)
  • Сложные transformations — лучше явный @task (debuggable, retryable, logged)

Когда .map() vs @task:

NeedUse
Simple inline transform (subscript, arithmetic).map()
Complex logic@task
Needs retry on failure@task
Side effects (logging, DB write)@task
Need logs@task

Production pattern: shard processing

Common pattern для DB sharding:

@task
def get_shards() -> list[dict]:
    """List active DB shards."""
    return [
        {"shard_id": 0, "host": "db-shard-0.internal", "user_range": "0-1M"},
        {"shard_id": 1, "host": "db-shard-1.internal", "user_range": "1M-2M"},
        {"shard_id": 2, "host": "db-shard-2.internal", "user_range": "2M-3M"},
    ]

@task
def process_shard(
    shard_id: int,
    host: str,
    user_range: str,
    target_table: str,   # constant
    extra_options: dict, # constant
):
    print(f"Process shard {shard_id}{target_table}")
    # SELECT from host, INSERT into target_table
    return {"shard_id": shard_id, "rows": 100000}

@task
def summarize(results: list[dict]):
    total = sum(r["rows"] for r in results)
    print(f"Total rows: {total}")

shards = get_shards()
results = process_shard.partial(
    target_table="warehouse.users"
    extra_options={"timeout": 300, "batch_size": 10000},
).expand_kwargs(shards)
summarize(results)

3 mapped TI, processing each shard parallel. Constants (target_table, extra_options) хранятся один раз. Каждая TI knows свой shard. Aggregation в summarize.


Multi-axis dynamic — when to flatten

Если нужно processing N envs × M regions × K services:

# ❌ Cartesian — 3×3×5 = 45 mapped TI, может быть OK
process.expand(env=envs, region=regions, service=services)

# ✅ Explicit list — только нужные комбинации, например 12 из 45
@task
def get_deploy_targets() -> list[dict]:
    return [
        {"env": "dev", "region": "us", "service": "api"},
        # ... только живые комбинации ...
    ]

process.expand_kwargs(get_deploy_targets())

Cartesian красиво на бумаге, но в production обычно нужны только некоторые из cells matrix. Лучше явно перечислить — экономит ресурсы scheduler-а.


Production gotchas

Gotcha 1: expand_kwargs validation

Каждый dict должен иметь те же keys (parameters callable). Mixed schemas:

process.expand_kwargs([
    {"a": 1, "b": 2},
    {"a": 1},          # ❌ missing 'b' — error если @task требует b как required
])

Default value поможет: def process(a, b=None).

Gotcha 2: zip с разной length — silent труcнация

users = ["alice", "bob", "carol"]
scores = [90, 85]   # короче на 1

users.zip(scores)   # 2 pairs, carol silently dropped

Если хотите fail — assert before:

@task
def get_users() -> list:
    users = fetch_users()
    scores = fetch_scores()
    assert len(users) == len(scores), f"Mismatch: {len(users)} vs {len(scores)}"
    return list(zip(users, scores))   # tuples

Gotcha 3: partial constants хранятся в DB как часть serialized_dag

process.partial(
    huge_config=load_10MB_config(),   # ← хранится в metadata DB
).expand(...)

Constants serializes как часть mapped operator. Если 10MB config — DB row для serialized_dag растёт. Лучше передавать reference (file path, S3 URI), не value.

Gotcha 4: .map(lambda) с closure

my_var = "prefix"

items.map(lambda x: f"{my_var}_{x}")   # ❌ closure capture my_var — pickling может fail

Lambda с closure — picklable если my_var picklable, но это fragile. Лучше:

items.map(lambda x: f"prefix_{x}")    # ✅ no closure

Или явный @task.

Gotcha 5: chained .map().map()

items.map(fn1).map(fn2).map(fn3)

Работает — каждая .map оборачивает в новый MapXComArg. Но на resolve все три function calls происходят последовательно для каждого element. Если transformations heavy — extract в один @task.


Проверка знанийKnowledge check
DAG нужно deploy 12 specific service+region комбинаций из 45 возможных (3 env × 3 region × 5 service cartesian). Какой подход и почему?
ОтветAnswer
Используйте `.expand_kwargs(list_of_dicts)`, не cartesian `.expand(env=..., region=..., service=...)`. Cartesian создаст все 45 mapped TI, из которых 33 — wasted (если хотите только 12 deploy комбинаций). Это (1) расходует scheduler resources, (2) создаёт 33 ненужных task_instance row + xcom row, (3) downstream aggregation pulls 45 values вместо 12, (4) приближает к max_map_length limit. Pattern: `@task def get_deploy_targets() -> list[dict]: return [{'env':'dev','region':'us','service':'api'}, ...]` со списком только нужных 12. Затем `process.expand_kwargs(get_deploy_targets())` — точно 12 mapped TI с правильными kwargs. Если есть constants (например target_account, audit_log): `.partial(target_account=..., audit_log=...).expand_kwargs(get_deploy_targets())` — constants хранятся одиножды в MappedOperator metadata, не дублируются 12 раз. Bonus: validation в get_deploy_targets — assert валидные комбинации, fail fast если конфиг сломан, до создания mapped TI.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. DAG нужно deploy 12 specific service+region+env комбинаций из 45 cartesian. Какой подход правильный?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6