diff --git a/iotfunctions/__init__.py b/iotfunctions/__init__.py index ca7a224e..9ffac57e 100644 --- a/iotfunctions/__init__.py +++ b/iotfunctions/__init__.py @@ -11,5 +11,5 @@ import os import pkgutil -__version__ = '8.8.0' +__version__ = '8.9.19' __all__ = list(module for (_, module, _) in pkgutil.iter_modules([os.path.dirname(__file__)])) diff --git a/iotfunctions/anomaly.py b/iotfunctions/anomaly.py index 1814c50b..fd9d1e84 100644 --- a/iotfunctions/anomaly.py +++ b/iotfunctions/anomaly.py @@ -18,6 +18,7 @@ import logging import time import hashlib # encode feature names +import traceback import numpy as np import pandas as pd @@ -97,6 +98,10 @@ Saliency_normalizer = 1 Generalized_normalizer = 1 / 300 +# Do away with numba logs +numba_logger = logging.getLogger('numba') +numba_logger.setLevel(logging.ERROR) + # from # https://stackoverflow.com/questions/44790072/sliding-window-on-time-series-data def view_as_windows1(temperature, length, step): @@ -261,27 +266,6 @@ def transform_spectral_residual(self, values): return spectral_residual -def merge_score(dfEntity, dfEntityOrig, column_name, score, mindelta): - """ - Fit interpolated score to original entity slice of the full dataframe - """ - - # equip score with time values, make sure it's positive - score[score < 0] = 0 - dfEntity[column_name] = score - - # merge - dfEntityOrig = pd.merge_asof(dfEntityOrig, dfEntity[column_name], left_index=True, right_index=True, - direction='nearest', tolerance=mindelta) - - if column_name + '_y' in dfEntityOrig: - merged_score = dfEntityOrig[column_name + '_y'].to_numpy() - else: - merged_score = dfEntityOrig[column_name].to_numpy() - - return merged_score - - ####################################################################################### # Scalers ####################################################################################### @@ -563,7 +547,8 @@ def prepare_data(self, dfEntity): # interpolate gaps - data imputation try: - dfe = dfe.dropna(subset=[self.input_item]).interpolate(method="time") + #dfe = dfe.dropna(subset=[self.input_item]).interpolate(method="time") + dfe = dfe.interpolate(method="time") except Exception as e: logger.error('Prepare data error: ' + str(e)) @@ -611,10 +596,15 @@ def _calc(self, df): # remove all rows with only null entries dfe = dfe_orig.dropna(how='all') + logger.info('Anomaly ' + str(df[self.output_items[0]].values.shape) + ', ' + + str(dfe_orig[self.output_items[0]].values.shape) + ', ' + + str(dfe[self.output_items[0]].values.shape)) # minimal time delta for merging mindelta, dfe_orig = min_delta(dfe_orig) + logger.info('Anomaly II ' + str(dfe_orig[self.output_items[0]].values.shape)) + logger.debug('Timedelta:' + str(mindelta) + ' Index: ' + str(dfe_orig.index)) # one dimensional time series - named temperature for catchyness @@ -658,8 +648,25 @@ def _calc(self, df): linear_interpolate = sp.interpolate.interp1d(time_series_temperature, scores[i], kind='linear', fill_value='extrapolate') - zScoreII = merge_score(dfe, dfe_orig, output_item, - abs(linear_interpolate(np.arange(0, temperature.size, 1))), mindelta) + # stretch anomaly score to fit temperature.size + score = abs(linear_interpolate(np.arange(0, temperature.size, 1))) + + # and make sure sure it's positive + score[score < 0] = 0 + + dfe[output_item] = score + + # merge so that data is stretched to match the original data w/o gaps and NaNs + dfe_orig = pd.merge_asof(dfe_orig, dfe[output_item], left_index=True, right_index=True, + direction='nearest', tolerance=mindelta) + + if output_item + '_y' in dfe_orig: + zScoreII = dfe_orig[output_item + '_y'].to_numpy() + else: + zScoreII = dfe_orig[output_item].to_numpy() + + logger.debug('Merge Score : ' + str(score.shape) + ', ' + str(zScoreII.shape)) + # fast path - either cut off or just copy elif diff < 0: zScoreII = scores[i][0:temperature.size] @@ -669,12 +676,12 @@ def _calc(self, df): # make sure shape is correct try: df[output_item] = zScoreII - except Exception as e2: + except Exception as e2: df[output_item] = zScoreII.reshape(-1,1) pass except Exception as e: - logger.error(self.whoami + ' score integration failed with ' + str(e)) + logger.error(self.whoami + ' score integration failed with ' + str(e) + '\n' + traceback.format_exc()) logger.debug('--->') diff --git a/iotfunctions/db.py b/iotfunctions/db.py index 7ac01e80..351e4f37 100644 --- a/iotfunctions/db.py +++ b/iotfunctions/db.py @@ -503,6 +503,7 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id= sqlalchemy_dialect_kwargs = {} # Establish database connection via sqlalchemy + logger.info('Establishing database connection via SqlAlchemy.') sqlalchemy_connection_kwargs = {**sqlalchemy_dialect_kwargs, **sqlalchemy_connection_kwargs} self.connection = create_engine(sqlalchemy_connection_string, echo=echo, **sqlalchemy_connection_kwargs) @@ -518,13 +519,14 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id= else: self.session = None self.metadata = MetaData(self.connection) - logger.debug('Database connection via SqlAlchemy established.') + logger.info('Database connection via SqlAlchemy established.') # Establish native database connection (for DB2 and PostgreSQL only) + logger.info('Establishing native database connection.') if self.db_type == 'db2': - self.native_connection = ibm_db.connect(native_connection_string, '', '') + self.native_connection = self.connect_to_db2(native_connection_string) self.native_connection_dbi = ibm_db_dbi.Connection(self.native_connection) - logger.debug('Native database connection to DB2 established.') + logger.info('Native database connection to DB2 established.') elif self.db_type == 'postgresql': cred = self.credentials['postgresql'] @@ -532,7 +534,7 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id= host=cred['host'], port=cred['port'], database=cred['db'], application_name="AS %s Native Connection" % self.application_name) self.native_connection_dbi = self.native_connection - logger.debug('Native database connection to PostgreSQL established.') + logger.info('Native database connection to PostgreSQL established.') else: self.native_connection = None self.native_connection_dbi = None @@ -609,6 +611,27 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id= else: logger.info(f"Data Dictionary is not available.") + def connect_to_db2(self, native_connection_string): + time_out = pd.Timestamp.utcnow() + pd.Timedelta(value=45, unit='seconds') + connection_attempt = 0 + connection = None + while connection is None and time_out > pd.Timestamp.utcnow(): + connection_attempt += 1 + if connection_attempt > 2: + # Delay execution of each attempt as follows (attempt/seconds): 1/0, 2/0, 3/4, 4/8, 5/16, 6/32 + time.sleep(2**(connection_attempt-1)) + try: + connection = ibm_db.connect(native_connection_string, '', '') + except Exception: + logger.error(f"Attempt #{connection_attempt} to connect to DB2 failed.", exc_info=True) + + if connection is None: + raise ConnectionError(f"DB2 connection could not be established in {connection_attempt} attempts.") + else: + logger.debug(f"DB2 connection was established at attempt #{connection_attempt}.") + + return connection + def _aggregate_item(self, table, column_name, aggregate, alias_column=None, dimension_table=None, timestamp_col=None): @@ -931,7 +954,7 @@ def get_entity_type(self, entity_type_id): entity = md.EntityType(name=metadata['name'], db=self, **{'auto_create_table': False, '_timestamp': timestamp, '_db_schema': schema, '_entity_type_id': entity_type_id, '_dimension_table_name': dim_table, - 'metric_table_name': metadata['metricTableName']}) + 'metric_table_name': metadata['metricTableName'], '_data_items': metadata.get('dataItemDto')}) return entity diff --git a/iotfunctions/metadata.py b/iotfunctions/metadata.py index ffcd964f..d9b6f5df 100644 --- a/iotfunctions/metadata.py +++ b/iotfunctions/metadata.py @@ -1201,20 +1201,50 @@ def get_custom_calendar(self): def get_data(self, start_ts=None, end_ts=None, entities=None, columns=None): - df = self.get_data_with_col_names(start_ts=start_ts, end_ts=end_ts, entities=entities, columns=columns) - - # Replace column names of data frame which are actually the DB2 column names in lower cases by the data item name + # Column names for raw metrics differ from data item name starting with Monitor 8.8 (column name = data item name + event id) + # To provide backward compatibility we have to map all data item names of raw metrics to column names when they are used as column names. + # The backward compatibility only works when the data item names are unique, i.e. a data item name must not be used in more than one event. + data_item_name_to_db_col_name = {} db_col_name_to_data_item_name = {} for data_item in self._data_items: data_item_name = data_item.get('name') - if data_item.get('type') == 'METRIC' and data_item_name not in ['ENTITY_ID', 'RCV_TIMESTAMP_UTC']: - db_col_name_to_data_item_name[data_item.get('columnName').lower()] = data_item_name.lower() + if data_item.get('type') == 'METRIC': + if data_item_name_to_db_col_name.get(data_item_name) is None: + data_item_col_name = data_item.get('columnName') + data_item_name_to_db_col_name[data_item_name] = data_item_col_name + db_col_name_to_data_item_name[data_item_col_name] = data_item_name + else: + raise ValueError("Data item name %s is defined in multiple events and therefore not unique. This is currently not supported in this function." % data_item_name) + if columns is not None: + mapped_columns = [data_item_name_to_db_col_name.get(col_name, col_name) for col_name in columns] + else: + mapped_columns = None + + if self._pre_agg_rules is not None: + agg_rules = {data_item_name_to_db_col_name.get(name, name): func_list for name, func_list in self._pre_agg_rules.items()} + else: + agg_rules = {} + + agg_outputs = {} + if self._pre_agg_outputs is not None: + for name, output_list in self._pre_agg_outputs.items(): + new_output_list = [data_item_name_to_db_col_name.get(output, output) for output in output_list] + agg_outputs[data_item_name_to_db_col_name.get(name, name)] = new_output_list + + df = self.get_data_with_col_names(start_ts=start_ts, end_ts=end_ts, entities=entities, columns=mapped_columns, agg_rules=agg_rules, agg_outputs=agg_outputs) + + # Column names of data frame are either column names of database table or as defined in self._pre_agg_outputs. We mapped all data item names + # to db column names self._pre_agg_outputs. Map db column names back to data item names. df.rename(columns=db_col_name_to_data_item_name, inplace=True) + # Repeat mapping taking into account upper case to lower case conversion of sqlalchemy + tmp_mapping = {col_name.lower(): name for col_name, name in db_col_name_to_data_item_name.items()} + df.rename(columns=tmp_mapping, inplace=True) + return df - def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, columns=None): + def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, columns=None, agg_rules=None, agg_outputs=None): """ Retrieve entity data at input grain or preaggregated """ @@ -1250,26 +1280,26 @@ def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, col # make sure each column is in the aggregate dictionary # apply a default aggregate for each column not specified in the aggregation metadata - if self._pre_agg_rules is None: - self._pre_agg_rules = {} - self._pre_agg_outputs = {} + if agg_rules is None: + agg_rules = {} + agg_outputs = {} for c in columns: try: - self._pre_agg_rules[c] + agg_rules[c] except KeyError: if c not in [self._timestamp, self._entity_id]: if c in metrics: - self._pre_agg_rules[c] = 'mean' - self._pre_agg_outputs[c] = 'mean_%s' % c + agg_rules[c] = 'mean' + agg_outputs[c] = 'mean_%s' % c else: - self._pre_agg_rules[c] = 'max' - self._pre_agg_outputs[c] = 'max_%s' % c + agg_rules[c] = 'max' + agg_outputs[c] = 'max_%s' % c else: pass df = self.db.read_agg(table_name=self.name, schema=self._db_schema, groupby=[self._entity_id], timestamp=self._timestamp, time_grain=self._pre_aggregate_time_grain, - agg_dict=self._pre_agg_rules, agg_outputs=self._pre_agg_outputs, start_ts=start_ts, + agg_dict=agg_rules, agg_outputs=agg_outputs, start_ts=start_ts, end_ts=end_ts, entities=entities, dimension=self._dimension_table_name) tw['pre-aggregeted'] = self._pre_aggregate_time_grain @@ -1284,6 +1314,10 @@ def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, col memo = MemoryOptimizer() df = memo.downcastNumeric(df) try: + if self._entity_id in df.columns and self._df_index_entity_id not in df.columns: + df[self._df_index_entity_id] = df[self._entity_id] + if self._timestamp.lower() in df.columns: + df.rename(columns={self._timestamp.lower(): self._timestamp}, inplace=True) df = self.index_df(df) except (AttributeError, KeyError): pass diff --git a/tests/test_base_functions.py b/tests/test_base_functions.py index fb8852c0..d875ff92 100644 --- a/tests/test_base_functions.py +++ b/tests/test_base_functions.py @@ -49,10 +49,10 @@ def test_base_functions(): df_i['Test2'] = df_i[Temperature] + addl df_i['Test3'] = df_i[Temperature] + addl df_i['Test4'] = df_i[Temperature] + addl - df_i['Test1'][3] = None - df_i['Test2'][2] = None - df_i['Test2'][3] = None - df_i['Test3'][1] = None + df_i['Test1'][3] = np.nan + df_i['Test2'][2] = np.nan + df_i['Test2'][3] = np.nan + df_i['Test3'][1] = np.nan df_i['Test4'][1] = 10000.0 df_i['Test4'][3] = 20000.0 diff --git a/tests/test_invoke_watson_studio.py b/tests/test_invoke_watson_studio.py index 15308034..fb67b51d 100644 --- a/tests/test_invoke_watson_studio.py +++ b/tests/test_invoke_watson_studio.py @@ -1,8 +1,12 @@ -# Licensed Materials - Property of IBM -# 5737-M66, 5900-AAA, 5900-A0N, 5725-S86, 5737-I75 -# (C) Copyright IBM Corp. 2020, 2022 All Rights Reserved. -# US Government Users Restricted Rights - Use, duplication, or disclosure -# restricted by GSA ADP Schedule Contract with IBM Corp. +# ***************************************************************************** +# © Copyright IBM Corp. 2020, 2022 All Rights Reserved. +# +# This program and the accompanying materials +# are made available under the terms of the Apache V2.0 license +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# ***************************************************************************** import logging import unittest