99 "log"
1010 "math"
1111 "math/rand"
12+ "net"
1213 "os"
1314 "os/signal"
1415 "sync"
@@ -171,7 +172,8 @@ func main() {
171172 if * password != "" {
172173 opts = append (opts , radix .DialAuthPass (* password ))
173174 }
174- connectionStr := fmt .Sprintf ("%s:%d" , * host , * port )
175+ ips , _ := net .LookupIP (* host )
176+
175177 stopChan := make (chan struct {})
176178 // a WaitGroup for the goroutines to tell us they've stopped
177179 wg := sync.WaitGroup {}
@@ -183,15 +185,15 @@ func main() {
183185 fmt .Printf ("Using random seed: %d\n " , * seed )
184186 rand .Seed (* seed )
185187 var cluster * radix.Cluster
186- var standalone * radix.Pool
187- if * clusterMode {
188- cluster = getOSSClusterConn (connectionStr , opts , * clients )
189- } else {
190- standalone = getStandaloneConn (connectionStr , opts , * clients )
191- }
188+
192189 datapointsChan := make (chan datapoint , * numberRequests )
193190 for channel_id := 1 ; uint64 (channel_id ) <= * clients ; channel_id ++ {
194191 wg .Add (1 )
192+ connectionStr := fmt .Sprintf ("%s:%d" , ips [rand .Int63n (int64 (len (ips )))], * port )
193+ if * clusterMode {
194+ cluster = getOSSClusterConn (connectionStr , opts , * clients )
195+ }
196+ fmt .Printf ("Using connection string %s for client %d\n " , connectionStr , channel_id )
195197 cmd := make ([]string , len (args ))
196198 copy (cmd , args )
197199 if * clusterMode {
@@ -200,7 +202,8 @@ func main() {
200202 if * multi {
201203 go ingestionRoutine (getStandaloneConn (connectionStr , opts , 1 ), * multi , datapointsChan , true , cmd , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , keyPlaceOlderPos , dataPlaceOlderPos , useRateLimiter , rateLimiter )
202204 } else {
203- go ingestionRoutine (standalone , * multi , datapointsChan , true , cmd , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , keyPlaceOlderPos , dataPlaceOlderPos , useRateLimiter , rateLimiter )
205+ go ingestionRoutine (getStandaloneConn (connectionStr , opts , 1 ), * multi , datapointsChan , true , cmd , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , keyPlaceOlderPos , dataPlaceOlderPos , useRateLimiter , rateLimiter )
206+ time .Sleep (time .Millisecond * 10 )
204207 }
205208 }
206209 }
@@ -212,6 +215,7 @@ func main() {
212215 tick := time .NewTicker (time .Duration (client_update_tick ) * time .Second )
213216 closed , _ , duration , totalMessages , _ := updateCLI (tick , c , * numberRequests , * loop , datapointsChan )
214217 messageRate := float64 (totalMessages ) / float64 (duration .Seconds ())
218+ avgMs := float64 (latencies .Mean ()) / 1000.0
215219 p50IngestionMs := float64 (latencies .ValueAtQuantile (50.0 )) / 1000.0
216220 p95IngestionMs := float64 (latencies .ValueAtQuantile (95.0 )) / 1000.0
217221 p99IngestionMs := float64 (latencies .ValueAtQuantile (99.0 )) / 1000.0
@@ -222,8 +226,8 @@ func main() {
222226 fmt .Printf ("Total Errors %d\n " , totalErrors )
223227 fmt .Printf ("Throughput summary: %.0f requests per second\n " , messageRate )
224228 fmt .Printf ("Latency summary (msec):\n " )
225- fmt .Printf (" %9s %9s %9s\n " , "p50" , "p95" , "p99" )
226- fmt .Printf (" %9.3f %9.3f %9.3f\n " , p50IngestionMs , p95IngestionMs , p99IngestionMs )
229+ fmt .Printf (" %9s %9s %9s %9s \n " , "avg " , "p50" , "p95" , "p99" )
230+ fmt .Printf (" %9.3f %9.3f %9.3f %9.3f \n " , avgMs , p50IngestionMs , p95IngestionMs , p99IngestionMs )
227231
228232 if closed {
229233 return
0 commit comments