@@ -56,6 +56,16 @@ def test_dbt_run_mocked_all_args():
5656 assert config .state == Path ("/path/to/state/" )
5757
5858
59+ class FakeTaskInstance :
60+ """Fake TaskInstance that stores result in XCom dict."""
61+
62+ def __init__ (self ):
63+ self .xcom = {}
64+
65+ def xcom_push (self , key , value , execution_date ):
66+ self .xcom [key ] = (value , execution_date )
67+
68+
5969def test_dbt_run_non_existent_model (profiles_file , dbt_project_file , model_files ):
6070 """Test execution of DbtRunOperator with a non-existent model."""
6171 op = DbtRunOperator (
@@ -68,6 +78,7 @@ def test_dbt_run_non_existent_model(profiles_file, dbt_project_file, model_files
6878 )
6979
7080 execution_results = op .execute ({})
81+
7182 assert len (execution_results ["results" ]) == 0
7283 assert isinstance (json .dumps (execution_results ), str )
7384
@@ -81,6 +92,7 @@ def test_dbt_run_models(profiles_file, dbt_project_file, model_files):
8192 models = [str (m .stem ) for m in model_files ],
8293 do_xcom_push = True ,
8394 )
95+
8496 execution_results = op .execute ({})
8597 run_result = execution_results ["results" ][0 ]
8698
@@ -160,11 +172,17 @@ def test_dbt_run_models_from_s3(
160172 profiles_dir = f"s3://{ s3_bucket } /project/" ,
161173 models = [str (m .stem ) for m in model_files ],
162174 do_xcom_push = True ,
175+ do_xcom_push_artifacts = ["manifest.json" , "run_results.json" ],
163176 )
164- execution_results = op .execute ({})
177+ ti = FakeTaskInstance ()
178+
179+ execution_results = op .execute ({"ti" : ti })
165180 run_result = execution_results ["results" ][0 ]
166181
167182 assert run_result ["status" ] == RunStatus .Success
183+ assert "manifest.json" in ti .xcom
184+ assert "run_results.json" in ti .xcom
185+ assert ti .xcom ["run_results.json" ][0 ]["results" ][0 ]["status" ] == "success"
168186
169187
170188@no_s3_hook
0 commit comments