expand_kwargs, zip, partial — продвинутые формы expansion
Базовый .expand(arg=iterable) (урок 02) даёт cartesian product, что не всегда нужно. В реальных production DAG чаще нужны:
- Explicit pairs — конкретные комбинации, не cross-multiplication
- Parallel zip — lockstep pairing двух lists
- Constants + dynamic — fixed arguments плюс одна меняющаяся ось
Airflow предоставляет три формы: .expand_kwargs(), XComArg.zip(), и .partial().expand(). В этом уроке препарируем каждую с production примерами.
.expand_kwargs() — list of dicts
.expand_kwargs(list_of_dicts) принимает явный список dict-ов. Каждый dict — это kwargs для одного mapped TI.
@task
def process(env: str, region: str, replicas: int):
print(f"{env}/{region} — {replicas} replicas")
process.expand_kwargs([
{"env": "dev", "region": "us", "replicas": 1},
{"env": "prod", "region": "us", "replicas": 5},
{"env": "prod", "region": "eu", "replicas": 3},
])
Результат: 3 mapped TI (не 2×2×3=12 как было бы у cartesian expand):
| map_index | env | region | replicas |
|---|---|---|---|
| 0 | dev | us | 1 |
| 1 | prod | us | 5 |
| 2 | prod | eu | 3 |
Use case: у вас уже есть готовые комбинации (из конфига, DB, или manual selection), не нужен cross-product.
От upstream XCom
.expand_kwargs(list) принимает также XComArg:
@task
def get_configs() -> list[dict]:
return [
{"env": "dev", "region": "us"},
{"env": "prod", "region": "eu"},
]
@task
def deploy(env: str, region: str):
print(f"Deploying {env}/{region}")
deploy.expand_kwargs(get_configs())
Scheduler reads XCom от get_configs, видит list of 2 dicts → creates 2 mapped TI. Каждая получает kwargs из соответствующего dict.
Validation на parse
Airflow inspects callable signature. Если в dict есть keys, которые не являются parameters callable — error на parse (Airflow 2.5+):
@task
def process(env: str):
pass
# ❌ Error: 'invalid_key' is not a parameter of process
process.expand_kwargs([{"env": "dev", "invalid_key": "value"}])
XComArg.zip() — parallel pairs
Когда у вас две parallel lists same length и нужны pairs (a[0]+b[0], a[1]+b[1]…):
from airflow.models.xcom_arg import XComArg
@task
def get_users() -> list[str]:
return ["alice", "bob", "carol"]
@task
def get_scores() -> list[int]:
return [90, 85, 92]
@task
def report(user: str, score: int):
print(f"{user}: {score}")
users = get_users()
scores = get_scores()
# Pair через zip + expand_kwargs:
report.expand_kwargs(
users.zip(scores).map(lambda pair: {"user": pair[0], "score": pair[1]})
)
Что произошло:
users.zip(scores)—ZipXComArglazy reference на[(alice, 90), (bob, 85), (carol, 92)].map(lambda pair: {...})— lazy transformation, превращает tuple в dictexpand_kwargs(...)— 3 mapped TI с правильными kwargs
zip с разной length
users.zip(scores) ведёт себя как Python zip() — обрезает по shortest. Если users 3 элемента, scores 2 — будет 2 pairs.
users.zip(scores, fillvalue=None) — pad shortest (как itertools.zip_longest).
.partial().expand() — constants + dynamic
Когда часть аргументов constants, часть dynamic — используйте .partial() для constants, .expand() для dynamic:
@task
def upload_to_s3(bucket: str, region: str, filename: str):
print(f"Upload {filename} → s3://{bucket} ({region})")
@task
def get_files() -> list[str]:
return ["a.csv", "b.csv", "c.csv"]
# bucket и region — constants для всех 3 mapped TI
# filename — dynamic, разный для каждой
upload_to_s3.partial(
bucket="my-bucket"
region="us-east-1",
).expand(
filename=get_files(),
)
3 mapped TI:
| map_index | bucket | region | filename |
|---|---|---|---|
| 0 | my-bucket | us-east-1 | a.csv |
| 1 | my-bucket | us-east-1 | b.csv |
| 2 | my-bucket | us-east-1 | c.csv |
Это не делает копию constants в XCom — они хранятся ОДИН раз в MappedOperator metadata, что эффективнее чем cartesian с 1-element list.
partial + expand_kwargs
Можно combine:
upload_to_s3.partial(
bucket="my-bucket",
).expand_kwargs(
[
{"region": "us-east-1", "filename": "a.csv"},
{"region": "eu-west-1", "filename": "b.csv"},
]
)
bucket константа, region+filename varying.
Combined: zip + map + partial
Real-world pattern:
@task
def get_users() -> list[str]:
return ["alice", "bob", "carol"]
@task
def get_emails() -> list[str]:
return ["[email protected]", "[email protected]", "[email protected]"]
@task
def send_notification(
smtp_host: str, # constant
from_addr: str, # constant
name: str, # dynamic — per map_index
email: str, # dynamic — per map_index
):
print(f"Send to {name} <{email}>")
names = get_users()
emails = get_emails()
send_notification.partial(
smtp_host="smtp.gmail.com"
from_addr="[email protected]",
).expand_kwargs(
names.zip(emails).map(lambda pair: {"name": pair[0], "email": pair[1]})
)
3 mapped TI, каждая с правильными name+email pair и общими smtp_host/from_addr.
Visualization сравнения
Comparison table
| Method | Input | Output count | Use case |
|---|---|---|---|
.expand(a=L) | one iterable | N=len(L) | Base case, simple |
.expand(a=L1, b=L2) | multiple iterables | N×M cartesian | All combinations |
.expand_kwargs(list) | list of dicts | N=len(list) | Pre-computed combos |
.zip(a, b) | two iterables | min(len) pairs | Lockstep parallel |
.partial(c=v).expand(...) | constants + dynamic | N=len(dynamic) | Mixed const/dynamic |
XComArg.map() vs @task
.map(fn) — lazy transformation на XComArg level. Применяется при resolve, не создаёт extra TI.
items = get_items() # XComArg
doubled = items.map(lambda x: x * 2) # lazy MapXComArgWithMap — НЕ TI
@task
def use(v): pass
use.expand(v=doubled)
Преимущества:
- Не создаёт extra task → не добавляет scheduling overhead
- Не пишет XCom для transformation result
Caveats:
- Lambda должна быть picklable (no closures с local vars)
- Сложные transformations — лучше явный
@task(debuggable, retryable, logged)
Когда .map() vs @task:
| Need | Use |
|---|---|
| Simple inline transform (subscript, arithmetic) | .map() |
| Complex logic | @task |
| Needs retry on failure | @task |
| Side effects (logging, DB write) | @task |
| Need logs | @task |
Production pattern: shard processing
Common pattern для DB sharding:
@task
def get_shards() -> list[dict]:
"""List active DB shards."""
return [
{"shard_id": 0, "host": "db-shard-0.internal", "user_range": "0-1M"},
{"shard_id": 1, "host": "db-shard-1.internal", "user_range": "1M-2M"},
{"shard_id": 2, "host": "db-shard-2.internal", "user_range": "2M-3M"},
]
@task
def process_shard(
shard_id: int,
host: str,
user_range: str,
target_table: str, # constant
extra_options: dict, # constant
):
print(f"Process shard {shard_id} → {target_table}")
# SELECT from host, INSERT into target_table
return {"shard_id": shard_id, "rows": 100000}
@task
def summarize(results: list[dict]):
total = sum(r["rows"] for r in results)
print(f"Total rows: {total}")
shards = get_shards()
results = process_shard.partial(
target_table="warehouse.users"
extra_options={"timeout": 300, "batch_size": 10000},
).expand_kwargs(shards)
summarize(results)
3 mapped TI, processing each shard parallel. Constants (target_table, extra_options) хранятся один раз. Каждая TI knows свой shard. Aggregation в summarize.
Multi-axis dynamic — when to flatten
Если нужно processing N envs × M regions × K services:
# ❌ Cartesian — 3×3×5 = 45 mapped TI, может быть OK
process.expand(env=envs, region=regions, service=services)
# ✅ Explicit list — только нужные комбинации, например 12 из 45
@task
def get_deploy_targets() -> list[dict]:
return [
{"env": "dev", "region": "us", "service": "api"},
# ... только живые комбинации ...
]
process.expand_kwargs(get_deploy_targets())
Cartesian красиво на бумаге, но в production обычно нужны только некоторые из cells matrix. Лучше явно перечислить — экономит ресурсы scheduler-а.
Production gotchas
Gotcha 1: expand_kwargs validation
Каждый dict должен иметь те же keys (parameters callable). Mixed schemas:
process.expand_kwargs([
{"a": 1, "b": 2},
{"a": 1}, # ❌ missing 'b' — error если @task требует b как required
])
Default value поможет: def process(a, b=None).
Gotcha 2: zip с разной length — silent труcнация
users = ["alice", "bob", "carol"]
scores = [90, 85] # короче на 1
users.zip(scores) # 2 pairs, carol silently dropped
Если хотите fail — assert before:
@task
def get_users() -> list:
users = fetch_users()
scores = fetch_scores()
assert len(users) == len(scores), f"Mismatch: {len(users)} vs {len(scores)}"
return list(zip(users, scores)) # tuples
Gotcha 3: partial constants хранятся в DB как часть serialized_dag
process.partial(
huge_config=load_10MB_config(), # ← хранится в metadata DB
).expand(...)
Constants serializes как часть mapped operator. Если 10MB config — DB row для serialized_dag растёт. Лучше передавать reference (file path, S3 URI), не value.
Gotcha 4: .map(lambda) с closure
my_var = "prefix"
items.map(lambda x: f"{my_var}_{x}") # ❌ closure capture my_var — pickling может fail
Lambda с closure — picklable если my_var picklable, но это fragile. Лучше:
items.map(lambda x: f"prefix_{x}") # ✅ no closure
Или явный @task.
Gotcha 5: chained .map().map()
items.map(fn1).map(fn2).map(fn3)
Работает — каждая .map оборачивает в новый MapXComArg. Но на resolve все три function calls происходят последовательно для каждого element. Если transformations heavy — extract в один @task.