3232 gen_overlay ,
3333)
3434from .elastic import (
35+ get_field_type ,
3536 get_search_base ,
3637 convert_composite ,
3738 split_fieldname_to_list ,
3839 get_nested_field_from_hit ,
3940 to_32bit_float ,
41+ Scan ,
4042 ScanAggs ,
4143 get_tile_categories ,
4244 scan
@@ -534,6 +536,9 @@ def get_span_upper_bound(span_range: str, estimated_points_per_tile: Optional[in
534536 if span_range == "wide" :
535537 return math .log (1e9 )
536538
539+ if span_range == "ultrawide" :
540+ return math .log (1e308 )
541+
537542 assert estimated_points_per_tile is not None
538543 return math .log (max (estimated_points_per_tile * 2 , 2 ))
539544
@@ -981,7 +986,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
981986 base_s = get_search_base (config .elastic_hosts , headers , params , idx )
982987
983988 # Now find out how many documents
984- count_s = copy .copy (base_s )
989+ count_s = copy .copy (base_s )[ 0 : 0 ] #slice of array sets from/size since we are aggregating the data we don't need the hits
985990 count_s = count_s .filter ("geo_bounding_box" , ** {geopoint_field : bb_dict })
986991
987992 doc_cnt = count_s .count ()
@@ -1109,26 +1114,89 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11091114
11101115 # the composite needs one bin for 'after_key'
11111116 composite_agg_size = int (max_bins / inner_agg_size ) - 1
1112-
1113- resp = ScanAggs (
1114- tile_s ,
1115- {"grids" : A ("geotile_grid" , field = geopoint_field , precision = geotile_precision )},
1116- inner_aggs ,
1117- size = composite_agg_size ,
1118- timeout = config .query_timeout_seconds
1119- )
1120-
1117+ field_type = get_field_type (config .elastic_hosts , headers , params ,geopoint_field , idx )
11211118 partial_data = False # TODO can we get partial data?
1122- df = pd .DataFrame (
1123- convert_composite (
1124- resp .execute (),
1125- (category_field is not None ),
1126- bool (category_filters ),
1127- histogram_interval ,
1128- category_type ,
1129- category_format
1119+ if field_type == "geo_point" :
1120+ resp = ScanAggs (
1121+ tile_s ,
1122+ {"grids" : A ("geotile_grid" , field = geopoint_field , precision = geotile_precision )},
1123+ inner_aggs ,
1124+ size = composite_agg_size ,
1125+ timeout = config .query_timeout_seconds
11301126 )
1131- )
1127+
1128+
1129+ df = pd .DataFrame (
1130+ convert_composite (
1131+ resp .execute (),
1132+ (category_field is not None ),
1133+ bool (category_filters ),
1134+ histogram_interval ,
1135+ category_type ,
1136+ category_format
1137+ )
1138+ )
1139+ estimated_points_per_tile = get_estimated_points_per_tile (span_range , global_bounds , z , global_doc_cnt )
1140+ elif field_type == "geo_shape" :
1141+ shape_s = copy .copy (tile_s )
1142+ searches = []
1143+ estimated_points_per_tile = 10000
1144+ zoom = 0
1145+ #span_range = "ultrawide"
1146+ if resolution == "coarse" :
1147+ zoom = 5
1148+ spread = 7
1149+ elif resolution == "fine" :
1150+ zoom = 6
1151+ spread = 3
1152+ elif resolution == "finest" :
1153+ zoom = 7
1154+ spread = 1
1155+ searches = []
1156+ composite_agg_size = 65536 #max agg bucket size
1157+ geotile_precision = current_zoom + zoom
1158+ subtile_bb_dict = create_bounding_box_for_tile (x , y , z )
1159+ subtile_s = copy .copy (base_s )
1160+ subtile_s = subtile_s [0 :0 ]
1161+ subtile_s = subtile_s .filter ("geo_bounding_box" , ** {geopoint_field : subtile_bb_dict })
1162+ subtile_s .aggs .bucket ("comp" , "geotile_grid" , field = geopoint_field ,precision = geotile_precision ,size = composite_agg_size ,bounds = subtile_bb_dict )
1163+ searches .append (subtile_s )
1164+ #logger.info(inner_aggs)
1165+ cmap = "bmy" #todo have front end pass the cmap for none categorical
1166+ def calc_aggregation (bucket ,search ):
1167+ #get bounds from bucket.key
1168+ #do search for sum of values on category_field
1169+ z , x , y = [ int (x ) for x in bucket .key .split ("/" ) ]
1170+ bucket_bb_dict = create_bounding_box_for_tile (x , y , z )
1171+ subtile_s = copy .copy (base_s )
1172+ subtile_s .aggs .bucket ("sum" ,"median_absolute_deviation" ,field = category_field ,missing = 0 )
1173+ subtile_s = subtile_s [0 :0 ]
1174+ subtile_s = subtile_s .filter ("geo_bounding_box" , ** {geopoint_field : bucket_bb_dict })
1175+ response = subtile_s .execute ()
1176+ search .num_searches += 1
1177+ search .total_took += response .took
1178+ search .total_shards += response ._shards .total # pylint: disable=W0212
1179+ search .total_skipped += response ._shards .skipped # pylint: disable=W0212
1180+ search .total_successful += response ._shards .successful # pylint: disable=W0212
1181+ search .total_failed += response ._shards .failed # pylint: disable=W0212
1182+ bucket .doc_count = response .aggregations .sum ['value' ] #replace with sum of category_field
1183+ return bucket
1184+ bucket_callback = None
1185+ if category_field :
1186+ bucket_callback = calc_aggregation
1187+ resp = Scan (searches ,timeout = config .query_timeout_seconds ,bucket_callback = bucket_callback )
1188+ df = pd .DataFrame (
1189+ convert_composite (
1190+ resp .execute (),
1191+ False ,#we don't need categorical, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback
1192+ False ,#we dont need filter_buckets, because ES doesn't support composite buckets for geo_shapes we calculate that with a secondary search in the bucket_callback
1193+ histogram_interval ,
1194+ category_type ,
1195+ category_format
1196+ )
1197+ )
1198+ if len (df )/ resp .num_searches == composite_agg_size :
1199+ logger .warn ("clipping on tile %s" ,[x ,y ,z ])
11321200
11331201 s2 = time .time ()
11341202 logger .info ("ES took %s (%s) for %s with %s searches" , (s2 - s1 ), resp .total_took , len (df ), resp .num_searches )
@@ -1142,7 +1210,6 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11421210 metrics ["shards_failed" ] = resp .total_failed
11431211 logger .info ("%s" , metrics )
11441212
1145- estimated_points_per_tile = get_estimated_points_per_tile (span_range , global_bounds , z , global_doc_cnt )
11461213
11471214 if len (df .index ) == 0 :
11481215 img = gen_empty (tile_width_px , tile_height_px )
@@ -1154,7 +1221,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11541221
11551222 ###############################################################
11561223 # Category Mode
1157- if category_field :
1224+ if category_field and field_type != "geo_shape" :
11581225 # TODO it would be nice if datashader honored the category orders
11591226 # in z-order, then we could make "Other" drawn underneath the less
11601227 # promenent colors
0 commit comments