Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
## TraceMesh: Scalable and Streaming Sampling for Distributed Traces
# TraceMesh: Scalable and Streaming Sampling for Distributed Traces

This is the replication package for [CLOUD'24] [TraceMesh: Scalable and Streaming Sampling for Distributed Traces](https://arxiv.org/abs/2406.06975).

In this paper, we propose **TraceMesh**, a scalable and streaming trace sampler.


![Overall framework of *TraceMesh*](./docs/TraceMesh.png)

### Repository Organization
## Repository Organization

```
```python
├── docs/
├── datasets/online_boutique
│ ├── train.csv # The training trace dataset
Expand All @@ -25,17 +24,17 @@ In this paper, we propose **TraceMesh**, a scalable and streaming trace sampler.
└── README.md
```

### Quick Start
## Quick Start

#### Installation
### Installation

1. Install [python >= 3.9](https://www.python.org/downloads/).

2. Install the dependency needed by *TraceMesh* with the following command.

​```pip install -r requirements.txt```

#### Data Preparation
### Data Preparation

We have collected, processed and cleaned the trace data from the `online_boutique` system for demonstration purposes.
The trace data is available in the form of CSV files within the `datasets/` directory.
Expand All @@ -48,7 +47,7 @@ cd datasets/online_boutique
tar -xzvf test.tar.gz
```

#### Demo Execution
### Demo Execution

- Run *TraceMesh*

Expand Down
42 changes: 22 additions & 20 deletions src/DenStream/DenStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sklearn.cluster import DBSCAN
from scipy.spatial.distance import cosine


class DenStream:

def __init__(self, lambd=0, eps=0.001, beta=2, mu=1, init_clusters=None, budget=0.01):
Expand Down Expand Up @@ -47,7 +48,7 @@ def __init__(self, lambd=0, eps=0.001, beta=2, mu=1, init_clusters=None, budget=
self.tp = ceil((1 / lambd) * np.log((beta * mu) / (beta * mu - 1)))
else:
self.tp = sys.maxsize

self.o_micro_clusters = []
self.p_micro_clusters = []
self.budget = budget
Expand All @@ -59,7 +60,6 @@ def __init__(self, lambd=0, eps=0.001, beta=2, mu=1, init_clusters=None, budget=
micro_cluster.insert_sample(sample, 1.0)
self.p_micro_clusters.append(micro_cluster)


def partial_fit(self, X, y=None, sample_weight=None):
"""
Online learning.
Expand Down Expand Up @@ -95,7 +95,7 @@ def partial_fit(self, X, y=None, sample_weight=None):
for sample, weight in zip(X, sample_weight):
self._partial_fit(sample, weight)
return self

def process_sample(self, sample, weight=1.0):
sample = np.array(sample, dtype=np.float64)
return self._partial_fit(sample, weight)
Expand Down Expand Up @@ -134,12 +134,12 @@ def fit_predict(self, X, y=None, sample_weight=None):

for sample, weight in zip(X, sample_weight):
self._partial_fit(sample, weight)

p_micro_cluster_centers = np.array([p_micro_cluster.center() for
p_micro_cluster in
self.p_micro_clusters])
p_micro_cluster_weights = [p_micro_cluster.weight() for p_micro_cluster in
self.p_micro_clusters]
p_micro_cluster_weights = [p_micro_cluster.weight()
for p_micro_cluster in self.p_micro_clusters]
print("before: ", len(p_micro_cluster_centers))
print(len(self.o_micro_clusters))
dbscan = DBSCAN(eps=0.1, algorithm='brute')
Expand All @@ -149,8 +149,8 @@ def fit_predict(self, X, y=None, sample_weight=None):

y = []
for sample in X:
index, _ = self._get_nearest_micro_cluster(sample,
self.p_micro_clusters)
index, _ = self._get_nearest_micro_cluster(
sample, self.p_micro_clusters)
y.append(dbscan.labels_[index])

return y
Expand Down Expand Up @@ -181,11 +181,13 @@ def _merging(self, sample, weight):
# Try to merge the sample with its nearest p_micro_cluster
_, nearest_p_micro_cluster = \
self._get_nearest_micro_cluster(sample, self.p_micro_clusters)
first_success = self._try_merge(sample, weight, nearest_p_micro_cluster)
first_success = self._try_merge(
sample, weight, nearest_p_micro_cluster)
if first_success:
sum_weight = sum([p_micro_cluster.weight() for p_micro_cluster in
self.p_micro_clusters])
prob = self.budget * (1 - nearest_p_micro_cluster.weight() / sum_weight)
sum_weight = sum([p_micro_cluster.weight()
for p_micro_cluster in self.p_micro_clusters])
prob = self.budget * \
(1 - nearest_p_micro_cluster.weight() / sum_weight)
if np.random.random() < prob:
return True
return False
Expand All @@ -211,10 +213,8 @@ def _decay_function(self, t):
def _partial_fit(self, sample, weight):
sampled = self._merging(sample, weight)
if self.t % self.tp == 0:
self.p_micro_clusters = [p_micro_cluster for p_micro_cluster
in self.p_micro_clusters if
p_micro_cluster.weight() >= self.beta *
self.mu]
self.p_micro_clusters = [p_micro_cluster for p_micro_cluster in self.p_micro_clusters if
p_micro_cluster.weight() >= self.beta * self.mu]
# Xis = [((self._decay_function(self.t - o_micro_cluster.creation_time
# + self.tp) - 1) /
# (self._decay_function(self.tp) - 1)) for o_micro_cluster in
Expand All @@ -232,17 +232,19 @@ def _validate_sample_weight(self, sample_weight, n_samples):
sample_weight = np.ones(n_samples, dtype=np.float64, order='C')
else:
# user-provided array
sample_weight = np.asarray(sample_weight, dtype=np.float64,
order="C")
sample_weight = np.asarray(
sample_weight, dtype=np.float64, order="C")
if sample_weight.shape[0] != n_samples:
raise ValueError("Shapes of X and sample_weight do not match.")
return sample_weight

def print_info(self):
p_sizes = [p_micro_cluster.size for p_micro_cluster in self.p_micro_clusters]
p_sizes = [
p_micro_cluster.size for p_micro_cluster in self.p_micro_clusters]
print(f"Number of p_micro_clusters is {len(self.p_micro_clusters)}")
print(f"Size of p_micro_clusters is {p_sizes}")
o_sizes = [o_micro_cluster.size for o_micro_cluster in self.o_micro_clusters]
o_sizes = [
o_micro_cluster.size for o_micro_cluster in self.o_micro_clusters]
print(f"Number of o_micro_clusters is {len(self.o_micro_clusters)}")
print(f"Size of o_micro_clusters is {o_sizes}")

Expand Down
4 changes: 2 additions & 2 deletions src/DenStream/MicroCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def center(self):

def weight(self):
return self.sum_of_weights

def size(self):
return self.size

Expand All @@ -56,4 +56,4 @@ def __copy__(self):
new_micro_cluster.sum_of_weights = self.sum_of_weights
new_micro_cluster.variance = self.variance
new_micro_cluster.mean = self.mean
return new_micro_cluster
return new_micro_cluster
30 changes: 16 additions & 14 deletions src/path_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def divide_shingle_chunks(shingle: str, chunk_length: int):


class path_embedding:
def __init__(self, chunk_length: int=20):
def __init__(self, chunk_length: int = 20):
self.operation_to_id = {}
self.chunk_length = chunk_length

Expand All @@ -35,7 +35,6 @@ def convert_duration_to_id(self, duration: int):
cnt += 1
return cnt


def build_graph(self, calls: list):
"""
Build graph from calls
Expand All @@ -59,21 +58,23 @@ def build_graph(self, calls: list):
nodes.sort(key=lambda x: span_to_operation[x])
return nodes, edges, span_to_operation


def dfs(self, node: str, edges: dict, span_to_operation: dict, visited_nodes: list, current_path: [], current_duration: None, all_paths: list):
def dfs(self, node: str, edges: dict, span_to_operation: dict, visited_nodes: list, current_path: list, current_duration: None, all_paths: list):
"""
DFS to get all paths
"""
if node in visited_nodes:
return []
visited_nodes.add(node)
current_path.append(self.convert_operation_to_id(span_to_operation[node]))
current_path.append(
self.convert_operation_to_id(span_to_operation[node]))
if current_duration is not None:
all_paths.append((current_path, self.convert_duration_to_id(current_duration)))
all_paths.append(
(current_path, self.convert_duration_to_id(current_duration)))
if node not in edges:
return
for span_id, duration in edges[node]:
self.dfs(span_id, edges, span_to_operation, visited_nodes, current_path.copy(), duration, all_paths)
self.dfs(span_id, edges, span_to_operation, visited_nodes,
current_path.copy(), duration, all_paths)
return

def convert_calls_to_path(self, calls: list):
Expand All @@ -83,41 +84,42 @@ def convert_calls_to_path(self, calls: list):
shingles = []
nodes, edges, span_to_operation = self.build_graph(calls)
# for span, operation in span_to_operation.items():
# print(span, operation, self.convert_operation_to_id(operation))
# print(span, operation, self.convert_operation_to_id(operation))
visited_nodes = set()
all_paths = []
self.dfs("root", edges, span_to_operation, visited_nodes, [], None, all_paths)
self.dfs("root", edges, span_to_operation,
visited_nodes, [], None, all_paths)
# print(visited_nodes)
# print(all_paths)
for node in nodes:
if node not in visited_nodes:
all_paths = []
self.dfs(node, edges, span_to_operation, visited_nodes, [], None, all_paths)
self.dfs(node, edges, span_to_operation,
visited_nodes, [], None, all_paths)
# print(all_paths)
return all_paths


def convert_paths_to_chunk(self, shingles: list):
"""
Convert shingles to chunks
"""
shingles_count = {}
for shingle_data in shingles:
shingle_list, duration = shingle_data
chunked_shingles = divide_shingle_chunks(shingle_list, self.chunk_length)
chunked_shingles = divide_shingle_chunks(
shingle_list, self.chunk_length)
for c in chunked_shingles:
chunk = tuple(c)
if chunk not in shingles_count:
shingles_count[chunk] = 0
shingles_count[chunk] = max(shingles_count[chunk], duration)
return [(list(shingle), cnt) for shingle, cnt in shingles_count.items()]


def convert_call_to_chunked_paths(self, calls: list):
"""
Convert calls to chunked shingles
"""
shingles = self.convert_calls_to_path(calls)
# print(shingles[1])
chunked_shingles = self.convert_paths_to_chunk(shingles)
return chunked_shingles
return chunked_shingles
15 changes: 9 additions & 6 deletions src/read_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def extract_calls(trace: pd.DataFrame):
print("Trace contains multiple TraceIDs")
return None
trace = trace.sort_values(by="StartTimeUnixNano")
id_to_operation = trace[['SpanID', 'OperationName']].set_index('SpanID').to_dict()['OperationName']
id_to_operation = trace[['SpanID', 'OperationName']].set_index('SpanID').to_dict()[
'OperationName']
id_to_operation["root"] = "root"
trace['ParentOperation'] = trace['ParentID'].map(id_to_operation)
calls = [tuple(row) for row in trace[["OperationName", "ParentOperation", "Duration", "SpanID", "ParentID"]].values]
calls = [tuple(row) for row in trace[["OperationName",
"ParentOperation", "Duration", "SpanID", "ParentID"]].values]
return calls


Expand All @@ -32,7 +34,7 @@ def get_calls(traces: pd.DataFrame):
for trace_id, trace in traces.groupby("TraceID"):
all_calls.append((trace_id, extract_calls(trace)))
return all_calls


def process_trace_dataset(csv_file: str):
"""
Expand All @@ -49,8 +51,9 @@ def process_trace_dataset(csv_file: str):
# all_spans = pd.concat(spans_list, ignore_index=True)
all_spans = read_csv(csv_file)
print("Total Number of Spans: {}".format(len(all_spans)))
print("Total Number of Traces: {}".format(len(all_spans["TraceID"].unique())))

print("Total Number of Traces: {}".format(
len(all_spans["TraceID"].unique())))

all_calls = get_calls(all_spans)
print("Total Number of call groups: {}".format(len(all_calls)))
return all_calls
return all_calls
10 changes: 5 additions & 5 deletions src/sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def cosine_similarity(vector_a, vector_b):


class SketchHash():
def __init__(self, L: int=1000, chunk_length: int=10):
def __init__(self, L: int = 1000, chunk_length: int = 10):
self.L = L
self.chunk_length = chunk_length
self.H = []
Expand All @@ -42,8 +42,8 @@ def hashmulti(self, shingle_list: list, randbits: list):
sum = randbits[0]
for i in range(len(shingle_list)):
sum += shingle_list[i] * randbits[i + 1]
return 2 * ((sum >> 63) & 1) - 1
return 2 * ((sum >> 63) & 1) - 1

def construct_streamhash_sketch(self, shingles: list):
"""
Construct streamhash sketch for a given list of shingles
Expand All @@ -53,9 +53,9 @@ def construct_streamhash_sketch(self, shingles: list):
for shingle in shingles:
shingle_list, count = shingle
for i in range(self.L):
projection[i] += count * self.hashmulti(shingle_list, self.H[i])
projection[i] += count * \
self.hashmulti(shingle_list, self.H[i])

sketch = [1 if p >= 0 else 0 for p in projection]

return (sketch, projection)

Loading