1212# limitations under the License.
1313
1414import abc
15+ import numbers
1516import threading
1617import time
1718
@@ -100,13 +101,14 @@ def __init__(self,
100101 self .logger .error (enums .Errors .INVALID_INPUT .format ('notification_center' ))
101102 self .notification_center = _notification_center .NotificationCenter ()
102103
104+ self .executor = None
103105 if start_on_init is True :
104106 self .start ()
105107
106108 @property
107109 def is_running (self ):
108110 """ Property to check if consumer thread is alive or not. """
109- return self .executor .isAlive ()
111+ return self .executor .isAlive () if self . executor else False
110112
111113 def _validate_intantiation_props (self , prop , prop_name ):
112114 """ Method to determine if instantiation properties like batch_size, flush_interval
@@ -121,12 +123,18 @@ def _validate_intantiation_props(self, prop, prop_name):
121123 False if property name is batch_size and value is a floating point number.
122124 True otherwise.
123125 """
124- if (prop_name == 'batch_size' and not isinstance (prop , int )) or prop is None or prop <= 0 or \
125- not validator .is_finite_number (prop ):
126+ is_valid = True
127+
128+ if prop is None or not validator .is_finite_number (prop ) or prop <= 0 :
129+ is_valid = False
130+
131+ if prop_name == 'batch_size' and not isinstance (prop , numbers .Integral ):
132+ is_valid = False
133+
134+ if is_valid is False :
126135 self .logger .info ('Using default value for {}.' .format (prop_name ))
127- return False
128136
129- return True
137+ return is_valid
130138
131139 def _get_time (self , _time = None ):
132140 """ Method to return rounded off time as integer in seconds. If _time is None, uses current time.
@@ -279,7 +287,8 @@ def stop(self):
279287 self .event_queue .put (self ._SHUTDOWN_SIGNAL )
280288 self .logger .warning ('Stopping Scheduler.' )
281289
282- self .executor .join (self .timeout_interval .total_seconds ())
290+ if self .executor :
291+ self .executor .join (self .timeout_interval .total_seconds ())
283292
284293 if self .is_running :
285294 self .logger .error ('Timeout exceeded while attempting to close for ' + str (self .timeout_interval ) + ' ms.' )
0 commit comments