From 38c82e9cb0fc6a6cfb12e4f0004efbd807578637 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 5 Sep 2025 03:43:53 +0530 Subject: [PATCH] Fix: replace `[]` with `list` to prevent errors and bugs in path_vector.py file --- README.md | 15 ++++++------- src/DenStream/DenStream.py | 42 ++++++++++++++++++----------------- src/DenStream/MicroCluster.py | 4 ++-- src/path_vector.py | 30 +++++++++++++------------ src/read_trace.py | 15 ++++++++----- src/sketch.py | 10 ++++----- src/trace_mesh.py | 37 +++++++++++++++++++----------- 7 files changed, 85 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 1d01ebd..7eff546 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,14 @@ -## TraceMesh: Scalable and Streaming Sampling for Distributed Traces +# TraceMesh: Scalable and Streaming Sampling for Distributed Traces This is the replication package for [CLOUD'24] [TraceMesh: Scalable and Streaming Sampling for Distributed Traces](https://arxiv.org/abs/2406.06975). In this paper, we propose **TraceMesh**, a scalable and streaming trace sampler. - ![Overall framework of *TraceMesh*](./docs/TraceMesh.png) -### Repository Organization +## Repository Organization -``` +```python ├── docs/ ├── datasets/online_boutique │ ├── train.csv # The training trace dataset @@ -25,9 +24,9 @@ In this paper, we propose **TraceMesh**, a scalable and streaming trace sampler. └── README.md ``` -### Quick Start +## Quick Start -#### Installation +### Installation 1. Install [python >= 3.9](https://www.python.org/downloads/). @@ -35,7 +34,7 @@ In this paper, we propose **TraceMesh**, a scalable and streaming trace sampler. ​```pip install -r requirements.txt``` -#### Data Preparation +### Data Preparation We have collected, processed and cleaned the trace data from the `online_boutique` system for demonstration purposes. The trace data is available in the form of CSV files within the `datasets/` directory. @@ -48,7 +47,7 @@ cd datasets/online_boutique tar -xzvf test.tar.gz ``` -#### Demo Execution +### Demo Execution - Run *TraceMesh* diff --git a/src/DenStream/DenStream.py b/src/DenStream/DenStream.py index 21d9b41..739f0e4 100644 --- a/src/DenStream/DenStream.py +++ b/src/DenStream/DenStream.py @@ -7,6 +7,7 @@ from sklearn.cluster import DBSCAN from scipy.spatial.distance import cosine + class DenStream: def __init__(self, lambd=0, eps=0.001, beta=2, mu=1, init_clusters=None, budget=0.01): @@ -47,7 +48,7 @@ def __init__(self, lambd=0, eps=0.001, beta=2, mu=1, init_clusters=None, budget= self.tp = ceil((1 / lambd) * np.log((beta * mu) / (beta * mu - 1))) else: self.tp = sys.maxsize - + self.o_micro_clusters = [] self.p_micro_clusters = [] self.budget = budget @@ -59,7 +60,6 @@ def __init__(self, lambd=0, eps=0.001, beta=2, mu=1, init_clusters=None, budget= micro_cluster.insert_sample(sample, 1.0) self.p_micro_clusters.append(micro_cluster) - def partial_fit(self, X, y=None, sample_weight=None): """ Online learning. @@ -95,7 +95,7 @@ def partial_fit(self, X, y=None, sample_weight=None): for sample, weight in zip(X, sample_weight): self._partial_fit(sample, weight) return self - + def process_sample(self, sample, weight=1.0): sample = np.array(sample, dtype=np.float64) return self._partial_fit(sample, weight) @@ -134,12 +134,12 @@ def fit_predict(self, X, y=None, sample_weight=None): for sample, weight in zip(X, sample_weight): self._partial_fit(sample, weight) - + p_micro_cluster_centers = np.array([p_micro_cluster.center() for p_micro_cluster in self.p_micro_clusters]) - p_micro_cluster_weights = [p_micro_cluster.weight() for p_micro_cluster in - self.p_micro_clusters] + p_micro_cluster_weights = [p_micro_cluster.weight() + for p_micro_cluster in self.p_micro_clusters] print("before: ", len(p_micro_cluster_centers)) print(len(self.o_micro_clusters)) dbscan = DBSCAN(eps=0.1, algorithm='brute') @@ -149,8 +149,8 @@ def fit_predict(self, X, y=None, sample_weight=None): y = [] for sample in X: - index, _ = self._get_nearest_micro_cluster(sample, - self.p_micro_clusters) + index, _ = self._get_nearest_micro_cluster( + sample, self.p_micro_clusters) y.append(dbscan.labels_[index]) return y @@ -181,11 +181,13 @@ def _merging(self, sample, weight): # Try to merge the sample with its nearest p_micro_cluster _, nearest_p_micro_cluster = \ self._get_nearest_micro_cluster(sample, self.p_micro_clusters) - first_success = self._try_merge(sample, weight, nearest_p_micro_cluster) + first_success = self._try_merge( + sample, weight, nearest_p_micro_cluster) if first_success: - sum_weight = sum([p_micro_cluster.weight() for p_micro_cluster in - self.p_micro_clusters]) - prob = self.budget * (1 - nearest_p_micro_cluster.weight() / sum_weight) + sum_weight = sum([p_micro_cluster.weight() + for p_micro_cluster in self.p_micro_clusters]) + prob = self.budget * \ + (1 - nearest_p_micro_cluster.weight() / sum_weight) if np.random.random() < prob: return True return False @@ -211,10 +213,8 @@ def _decay_function(self, t): def _partial_fit(self, sample, weight): sampled = self._merging(sample, weight) if self.t % self.tp == 0: - self.p_micro_clusters = [p_micro_cluster for p_micro_cluster - in self.p_micro_clusters if - p_micro_cluster.weight() >= self.beta * - self.mu] + self.p_micro_clusters = [p_micro_cluster for p_micro_cluster in self.p_micro_clusters if + p_micro_cluster.weight() >= self.beta * self.mu] # Xis = [((self._decay_function(self.t - o_micro_cluster.creation_time # + self.tp) - 1) / # (self._decay_function(self.tp) - 1)) for o_micro_cluster in @@ -232,17 +232,19 @@ def _validate_sample_weight(self, sample_weight, n_samples): sample_weight = np.ones(n_samples, dtype=np.float64, order='C') else: # user-provided array - sample_weight = np.asarray(sample_weight, dtype=np.float64, - order="C") + sample_weight = np.asarray( + sample_weight, dtype=np.float64, order="C") if sample_weight.shape[0] != n_samples: raise ValueError("Shapes of X and sample_weight do not match.") return sample_weight def print_info(self): - p_sizes = [p_micro_cluster.size for p_micro_cluster in self.p_micro_clusters] + p_sizes = [ + p_micro_cluster.size for p_micro_cluster in self.p_micro_clusters] print(f"Number of p_micro_clusters is {len(self.p_micro_clusters)}") print(f"Size of p_micro_clusters is {p_sizes}") - o_sizes = [o_micro_cluster.size for o_micro_cluster in self.o_micro_clusters] + o_sizes = [ + o_micro_cluster.size for o_micro_cluster in self.o_micro_clusters] print(f"Number of o_micro_clusters is {len(self.o_micro_clusters)}") print(f"Size of o_micro_clusters is {o_sizes}") diff --git a/src/DenStream/MicroCluster.py b/src/DenStream/MicroCluster.py index f469c39..a92c041 100644 --- a/src/DenStream/MicroCluster.py +++ b/src/DenStream/MicroCluster.py @@ -47,7 +47,7 @@ def center(self): def weight(self): return self.sum_of_weights - + def size(self): return self.size @@ -56,4 +56,4 @@ def __copy__(self): new_micro_cluster.sum_of_weights = self.sum_of_weights new_micro_cluster.variance = self.variance new_micro_cluster.mean = self.mean - return new_micro_cluster \ No newline at end of file + return new_micro_cluster diff --git a/src/path_vector.py b/src/path_vector.py index c92b40b..2e1bc5d 100644 --- a/src/path_vector.py +++ b/src/path_vector.py @@ -9,7 +9,7 @@ def divide_shingle_chunks(shingle: str, chunk_length: int): class path_embedding: - def __init__(self, chunk_length: int=20): + def __init__(self, chunk_length: int = 20): self.operation_to_id = {} self.chunk_length = chunk_length @@ -35,7 +35,6 @@ def convert_duration_to_id(self, duration: int): cnt += 1 return cnt - def build_graph(self, calls: list): """ Build graph from calls @@ -59,21 +58,23 @@ def build_graph(self, calls: list): nodes.sort(key=lambda x: span_to_operation[x]) return nodes, edges, span_to_operation - - def dfs(self, node: str, edges: dict, span_to_operation: dict, visited_nodes: list, current_path: [], current_duration: None, all_paths: list): + def dfs(self, node: str, edges: dict, span_to_operation: dict, visited_nodes: list, current_path: list, current_duration: None, all_paths: list): """ DFS to get all paths """ if node in visited_nodes: return [] visited_nodes.add(node) - current_path.append(self.convert_operation_to_id(span_to_operation[node])) + current_path.append( + self.convert_operation_to_id(span_to_operation[node])) if current_duration is not None: - all_paths.append((current_path, self.convert_duration_to_id(current_duration))) + all_paths.append( + (current_path, self.convert_duration_to_id(current_duration))) if node not in edges: return for span_id, duration in edges[node]: - self.dfs(span_id, edges, span_to_operation, visited_nodes, current_path.copy(), duration, all_paths) + self.dfs(span_id, edges, span_to_operation, visited_nodes, + current_path.copy(), duration, all_paths) return def convert_calls_to_path(self, calls: list): @@ -83,20 +84,21 @@ def convert_calls_to_path(self, calls: list): shingles = [] nodes, edges, span_to_operation = self.build_graph(calls) # for span, operation in span_to_operation.items(): - # print(span, operation, self.convert_operation_to_id(operation)) + # print(span, operation, self.convert_operation_to_id(operation)) visited_nodes = set() all_paths = [] - self.dfs("root", edges, span_to_operation, visited_nodes, [], None, all_paths) + self.dfs("root", edges, span_to_operation, + visited_nodes, [], None, all_paths) # print(visited_nodes) # print(all_paths) for node in nodes: if node not in visited_nodes: all_paths = [] - self.dfs(node, edges, span_to_operation, visited_nodes, [], None, all_paths) + self.dfs(node, edges, span_to_operation, + visited_nodes, [], None, all_paths) # print(all_paths) return all_paths - def convert_paths_to_chunk(self, shingles: list): """ Convert shingles to chunks @@ -104,7 +106,8 @@ def convert_paths_to_chunk(self, shingles: list): shingles_count = {} for shingle_data in shingles: shingle_list, duration = shingle_data - chunked_shingles = divide_shingle_chunks(shingle_list, self.chunk_length) + chunked_shingles = divide_shingle_chunks( + shingle_list, self.chunk_length) for c in chunked_shingles: chunk = tuple(c) if chunk not in shingles_count: @@ -112,7 +115,6 @@ def convert_paths_to_chunk(self, shingles: list): shingles_count[chunk] = max(shingles_count[chunk], duration) return [(list(shingle), cnt) for shingle, cnt in shingles_count.items()] - def convert_call_to_chunked_paths(self, calls: list): """ Convert calls to chunked shingles @@ -120,4 +122,4 @@ def convert_call_to_chunked_paths(self, calls: list): shingles = self.convert_calls_to_path(calls) # print(shingles[1]) chunked_shingles = self.convert_paths_to_chunk(shingles) - return chunked_shingles \ No newline at end of file + return chunked_shingles diff --git a/src/read_trace.py b/src/read_trace.py index 4cfd378..16dcb4b 100644 --- a/src/read_trace.py +++ b/src/read_trace.py @@ -17,10 +17,12 @@ def extract_calls(trace: pd.DataFrame): print("Trace contains multiple TraceIDs") return None trace = trace.sort_values(by="StartTimeUnixNano") - id_to_operation = trace[['SpanID', 'OperationName']].set_index('SpanID').to_dict()['OperationName'] + id_to_operation = trace[['SpanID', 'OperationName']].set_index('SpanID').to_dict()[ + 'OperationName'] id_to_operation["root"] = "root" trace['ParentOperation'] = trace['ParentID'].map(id_to_operation) - calls = [tuple(row) for row in trace[["OperationName", "ParentOperation", "Duration", "SpanID", "ParentID"]].values] + calls = [tuple(row) for row in trace[["OperationName", + "ParentOperation", "Duration", "SpanID", "ParentID"]].values] return calls @@ -32,7 +34,7 @@ def get_calls(traces: pd.DataFrame): for trace_id, trace in traces.groupby("TraceID"): all_calls.append((trace_id, extract_calls(trace))) return all_calls - + def process_trace_dataset(csv_file: str): """ @@ -49,8 +51,9 @@ def process_trace_dataset(csv_file: str): # all_spans = pd.concat(spans_list, ignore_index=True) all_spans = read_csv(csv_file) print("Total Number of Spans: {}".format(len(all_spans))) - print("Total Number of Traces: {}".format(len(all_spans["TraceID"].unique()))) - + print("Total Number of Traces: {}".format( + len(all_spans["TraceID"].unique()))) + all_calls = get_calls(all_spans) print("Total Number of call groups: {}".format(len(all_calls))) - return all_calls \ No newline at end of file + return all_calls diff --git a/src/sketch.py b/src/sketch.py index 8fc9551..d818ccc 100644 --- a/src/sketch.py +++ b/src/sketch.py @@ -16,7 +16,7 @@ def cosine_similarity(vector_a, vector_b): class SketchHash(): - def __init__(self, L: int=1000, chunk_length: int=10): + def __init__(self, L: int = 1000, chunk_length: int = 10): self.L = L self.chunk_length = chunk_length self.H = [] @@ -42,8 +42,8 @@ def hashmulti(self, shingle_list: list, randbits: list): sum = randbits[0] for i in range(len(shingle_list)): sum += shingle_list[i] * randbits[i + 1] - return 2 * ((sum >> 63) & 1) - 1 - + return 2 * ((sum >> 63) & 1) - 1 + def construct_streamhash_sketch(self, shingles: list): """ Construct streamhash sketch for a given list of shingles @@ -53,9 +53,9 @@ def construct_streamhash_sketch(self, shingles: list): for shingle in shingles: shingle_list, count = shingle for i in range(self.L): - projection[i] += count * self.hashmulti(shingle_list, self.H[i]) + projection[i] += count * \ + self.hashmulti(shingle_list, self.H[i]) sketch = [1 if p >= 0 else 0 for p in projection] return (sketch, projection) - diff --git a/src/trace_mesh.py b/src/trace_mesh.py index ff1390a..edad499 100644 --- a/src/trace_mesh.py +++ b/src/trace_mesh.py @@ -21,11 +21,16 @@ def calculate_metrics(predicted_list, ground_truth_list, original_size): def get_config(): parser = argparse.ArgumentParser() - parser.add_argument("--sketch_length", type=int, default=100, help="Length of the sketch") - parser.add_argument("--eps", type=float, default=0.01, help="Epsilon value for clustering") - parser.add_argument("--data_path", type=str, default="../datasets/", help="Path to the dataset") - parser.add_argument("--dataset", type=str, default="online_boutique", help="Name of the dataset") - parser.add_argument("--budget", type=float, default=0.01, help="Budget for the sampling") + parser.add_argument("--sketch_length", type=int, + default=100, help="Length of the sketch") + parser.add_argument("--eps", type=float, default=0.01, + help="Epsilon value for clustering") + parser.add_argument("--data_path", type=str, + default="../datasets/", help="Path to the dataset") + parser.add_argument("--dataset", type=str, + default="online_boutique", help="Name of the dataset") + parser.add_argument("--budget", type=float, default=0.01, + help="Budget for the sampling") args = parser.parse_args() return args @@ -33,7 +38,8 @@ def get_config(): if __name__ == "__main__": args = get_config() - all_calls = process_trace_dataset(os.path.join(args.data_path, args.dataset, "train.csv")) + all_calls = process_trace_dataset(os.path.join( + args.data_path, args.dataset, "train.csv")) graph = path_embedding(chunk_length=100) sketch_hash = SketchHash(L=args.sketch_length, chunk_length=100) @@ -41,10 +47,12 @@ def get_config(): for idx, call_tuple in enumerate(all_calls): trace_id, call = call_tuple chunked_shingles = graph.convert_call_to_chunked_paths(call) - sketch, projection = sketch_hash.construct_streamhash_sketch(chunked_shingles) + sketch, projection = sketch_hash.construct_streamhash_sketch( + chunked_shingles) sketch_list.append(sketch) - dbscan = DBSCAN(eps=args.eps, min_samples=1, algorithm="brute", metric="cosine") + dbscan = DBSCAN(eps=args.eps, min_samples=1, + algorithm="brute", metric="cosine") dbscan.fit(sketch_list) labels = dbscan.labels_ @@ -54,17 +62,20 @@ def get_config(): if label not in clusters: clusters[label] = [] clusters[label].append(sketch_list[i]) - - DS_clusterer = DenStream(lambd=0.01, eps=args.eps, beta=10, mu=1, init_clusters=clusters, budget=args.budget) + + DS_clusterer = DenStream(lambd=0.01, eps=args.eps, beta=10, + mu=1, init_clusters=clusters, budget=args.budget) # DS_clusterer.print_info() - test_calls = process_trace_dataset(os.path.join(args.data_path, args.dataset, "test.csv")) + test_calls = process_trace_dataset(os.path.join( + args.data_path, args.dataset, "test.csv")) sampled_count = 0 sampled_trace = [] for idx, call_tuple in tqdm(enumerate(test_calls), total=len(test_calls)): trace_id, call = call_tuple chunked_shingles = graph.convert_call_to_chunked_paths(call) - sketch, projection = sketch_hash.construct_streamhash_sketch(chunked_shingles) + sketch, projection = sketch_hash.construct_streamhash_sketch( + chunked_shingles) sampled = DS_clusterer.process_sample(sketch) if sampled: sampled_count += 1 @@ -77,5 +88,5 @@ def get_config(): lines = file.readlines() for line in lines: anomaly_labels.append(str(line.strip())) - + calculate_metrics(sampled_trace, anomaly_labels, len(test_calls))