diff --git a/airflow-core/src/airflow/config_templates/airflow_local_settings.py b/airflow-core/src/airflow/config_templates/airflow_local_settings.py index 4deb996de08ea..96bc0eb464c63 100644 --- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py +++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py @@ -279,35 +279,29 @@ def _default_conn_name_from(mod_path, hook_name): ) remote_task_handler_kwargs = {} elif ELASTICSEARCH_HOST: - ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK") - ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend") + from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchRemoteLogIO + ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT") ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", "WRITE_TO_ES") ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT") - ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS") ELASTICSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("elasticsearch", "TARGET_INDEX") ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD") ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD") + ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE") + + REMOTE_TASK_LOG = ElasticsearchRemoteLogIO( + host=ELASTICSEARCH_HOST, + target_index=ELASTICSEARCH_TARGET_INDEX, + write_stdout=ELASTICSEARCH_WRITE_STDOUT, + write_to_es=ELASTICSEARCH_WRITE_TO_ES, + offset_field=ELASTICSEARCH_OFFSET_FIELD, + host_field=ELASTICSEARCH_HOST_FIELD, + base_log_folder=BASE_LOG_FOLDER, + delete_local_copy=delete_local_copy, + json_format=ELASTICSEARCH_JSON_FORMAT, + log_id_template=ELASTICSEARCH_LOG_ID_TEMPLATE, + ) - ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = { - "task": { - "class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler", - "formatter": "airflow", - "base_log_folder": BASE_LOG_FOLDER, - "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK, - "host": ELASTICSEARCH_HOST, - "frontend": ELASTICSEARCH_FRONTEND, - "write_stdout": ELASTICSEARCH_WRITE_STDOUT, - "write_to_es": ELASTICSEARCH_WRITE_TO_ES, - "target_index": ELASTICSEARCH_TARGET_INDEX, - "json_format": ELASTICSEARCH_JSON_FORMAT, - "json_fields": ELASTICSEARCH_JSON_FIELDS, - "host_field": ELASTICSEARCH_HOST_FIELD, - "offset_field": ELASTICSEARCH_OFFSET_FIELD, - }, - } - - DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS) elif OPENSEARCH_HOST: OPENSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK") OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT") diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py b/scripts/ci/prek/update_airflow_pyproject_toml.py index b3bf8369e0d97..1477fd2a3863c 100755 --- a/scripts/ci/prek/update_airflow_pyproject_toml.py +++ b/scripts/ci/prek/update_airflow_pyproject_toml.py @@ -79,6 +79,7 @@ "git": parse_version("0.0.2"), "common.messaging": parse_version("2.0.0"), "informatica": parse_version("0.1.0"), + "elasticsearch": parse_version("6.5.0"), }