|
100 | 100 |
|
101 | 101 | {% macro py_write_table(compiled_code, target_relation, temporary=False) %} |
102 | 102 | {{ compiled_code.replace(model.raw_code, "", 1) }} |
103 | | - def materialize(df, table, session): |
104 | | - if isinstance(df, pd.core.frame.DataFrame): |
105 | | - oml.create(df, table=table) |
106 | | - elif isinstance(df, oml.core.frame.DataFrame): |
107 | | - df.materialize(table=table) |
108 | | - |
109 | | - dbt = dbtObj(load_df_function=oml.sync) |
110 | | - final_df = model(dbt, session=oml) |
111 | | - |
112 | | - {{ log("Python model materialization is " ~ model.config.materialized, info=True) }} |
113 | | - {% if model.config.materialized.lower() == 'table' %} |
114 | | - table_name = f"{dbt.this.identifier}__dbt_tmp" |
115 | | - {% else %} |
116 | | - # incremental materialization |
117 | | - {% if temporary %} |
118 | | - table_name = "{{target_relation.identifier}}" |
119 | | - {% else %} |
120 | | - table_name = dbt.this.identifier |
121 | | - {% endif %} |
122 | | - {% endif %} |
123 | | - materialize(final_df, table=table_name.upper(), session=oml) |
124 | | - return pd.DataFrame.from_dict({"result": [1]}) |
| 103 | + try: |
| 104 | + dbt = dbtObj(load_df_function=oml.sync) |
| 105 | + set_connection_attributes() |
| 106 | + final_df = model(dbt, session=oml) |
| 107 | + {{ log("Python model materialization is " ~ model.config.materialized, info=True) }} |
| 108 | + {% if model.config.materialized.lower() == 'table' %} |
| 109 | + table_name = f"{dbt.this.identifier}__dbt_tmp" |
| 110 | + {% else %} |
| 111 | + # incremental materialization |
| 112 | + {% if temporary %} |
| 113 | + table_name = "{{target_relation.identifier}}" |
| 114 | + {% else %} |
| 115 | + table_name = dbt.this.identifier |
| 116 | + {% endif %} |
| 117 | + {% endif %} |
| 118 | + materialize(final_df, table=table_name.upper(), session=oml) |
| 119 | + return pd.DataFrame.from_dict({"result": [1]}) |
| 120 | + except Exception: |
| 121 | + raise |
| 122 | + finally: |
| 123 | + connection = oml.core.methods._get_conn() |
| 124 | + connection.close() |
125 | 125 | {% endmacro %} |
0 commit comments