count = task_instance.xcom_pull(key="row_count", task_ids="transform_task")
✅ No explicit push/pull — return values flow automatically.
Pass exclusive keys to triggered DAGs:
def extract_data(**kwargs): # logic here file_path = "/tmp/data_2023.csv" return file_path # This is automatically pushed to XCom airflow xcom exclusive
Teams looking for a more modern, code-first experience often consider as a strong alternative. Apache Airflow
Or use the built-in Redis backend (install apache-airflow-providers-redis ):
with DAG('exclusive_xcom_demo', start_date=datetime(2023,1,1), schedule=None) as dag: t1 = PythonOperator(task_id='extract', python_callable=extract) t2 = PythonOperator(task_id='transform', python_callable=transform) t3 = PythonOperator(task_id='load', python_callable=load) count = task_instance
| Metric | Standard XCom | Exclusive Mode (Redis backend + key scoping) | |--------|---------------|------------------------------------------------| | Metadata DB size | 4.2 GB | 120 MB (only references) | | Avg. task pull latency | 85 ms | 12 ms | | Concurrent DAG runs | Limited by DB lock | 3x higher throughput | | Debug time (random error) | 45 min | 8 min (clear lineage) |
Example (psuedocode):
@classmethod def get_value(cls, key, dag_id, task_id, run_id, map_index): # Enforce exclusive pull: only if (dag_id, calling_task, target_task) is allowed calling_task = task_id # Note: in real implementation, you'd need to resolve caller allowed_keys = cls.ALLOWED_PULLS.get((dag_id, calling_task), []) if key not in allowed_keys: raise AirflowException( f"XCom exclusive violation: Task calling_task not allowed to pull key 'key'" ) return super().get_value(key, dag_id, task_id, run_id, map_index) task pull latency | 85 ms | 12
XComs are not a general-purpose data storage solution. They have strict limitations that define their usage.
By default, Airflow writes XCom data directly into its metadata database (SQLAlchemy-supported databases like PostgreSQL, MySQL, or SQLite). The data is serialized into text (typically JSON) and stored in the xcom table.
count = task_instance.xcom_pull(key="row_count", task_ids="transform_task")
✅ No explicit push/pull — return values flow automatically.
Pass exclusive keys to triggered DAGs:
def extract_data(**kwargs): # logic here file_path = "/tmp/data_2023.csv" return file_path # This is automatically pushed to XCom
Teams looking for a more modern, code-first experience often consider as a strong alternative. Apache Airflow
Or use the built-in Redis backend (install apache-airflow-providers-redis ):
with DAG('exclusive_xcom_demo', start_date=datetime(2023,1,1), schedule=None) as dag: t1 = PythonOperator(task_id='extract', python_callable=extract) t2 = PythonOperator(task_id='transform', python_callable=transform) t3 = PythonOperator(task_id='load', python_callable=load)
| Metric | Standard XCom | Exclusive Mode (Redis backend + key scoping) | |--------|---------------|------------------------------------------------| | Metadata DB size | 4.2 GB | 120 MB (only references) | | Avg. task pull latency | 85 ms | 12 ms | | Concurrent DAG runs | Limited by DB lock | 3x higher throughput | | Debug time (random error) | 45 min | 8 min (clear lineage) |
Example (psuedocode):
@classmethod def get_value(cls, key, dag_id, task_id, run_id, map_index): # Enforce exclusive pull: only if (dag_id, calling_task, target_task) is allowed calling_task = task_id # Note: in real implementation, you'd need to resolve caller allowed_keys = cls.ALLOWED_PULLS.get((dag_id, calling_task), []) if key not in allowed_keys: raise AirflowException( f"XCom exclusive violation: Task calling_task not allowed to pull key 'key'" ) return super().get_value(key, dag_id, task_id, run_id, map_index)
XComs are not a general-purpose data storage solution. They have strict limitations that define their usage.
By default, Airflow writes XCom data directly into its metadata database (SQLAlchemy-supported databases like PostgreSQL, MySQL, or SQLite). The data is serialized into text (typically JSON) and stored in the xcom table.