|
1 | | -# Coming soon! |
| 1 | +# Performance in Storm |
| 2 | + |
| 3 | +Measuring performance of a Bullet instance running on a multi-tenant Storm cluster is not an easy task. There are a lot of independent variables that we could vary and have an effect including: |
| 4 | + |
| 5 | +1. The amount of data we consume |
| 6 | +2. The number of simultaneous queries Bullet runs |
| 7 | +3. The kinds of simultaneous queries - ```RAW```, ```GROUP```, ```COUNT DISTINCT```, etc. |
| 8 | +4. Varying parallelisms of the components - increase the parallelisms of Filter bolts disproportionately to others |
| 9 | +5. The hardware configuration of machines |
| 10 | +6. The various Storm versions |
| 11 | +7. How free the cluster is and the kinds of topologies running on the cluster - CPU heavy, Disk/memory heavy, network heavy etc |
| 12 | +8. The source of the data and tuning consumption from it |
| 13 | + |
| 14 | +...and many more. |
| 15 | + |
| 16 | +In this section, we will focus on analyzing the effects of primarily 1 and 2, while keeping the others fixed or as fixed as possible while running on a multi-tenant Storm cluster. |
| 17 | + |
| 18 | +!!! note "Work in Progress" |
| 19 | + |
| 20 | + This page is a work in progress. It will continue to be updated and iterated upon. The idea to give users some insight into how Bullet runs in general and how to tweak Bullet components. This is *not* a benchmark and your mileage may vary. |
| 21 | + |
| 22 | + |
| 23 | +## Prerequisites |
| 24 | + |
| 25 | +The rest of this document assumes that you are familiar with [Storm](http://storm.apache.org), [Kafka](http://kafka.apache.org) and the [Bullet on Storm architecture](storm-architecture.md). |
| 26 | + |
| 27 | +## How was this tested? |
| 28 | + |
| 29 | +All tests run here were using [Bullet-Storm 0.3.1](https://github.com/yahoo/bullet-storm/releases/tag/bullet-storm-0.3.1). The intent is to test just the Storm piece without going through the Web Service or the UI. The DRPC REST endpoint provided by Storm lets us do just that. |
| 30 | + |
| 31 | +Using the pluggable metrics interface in Bullet on Storm, various worker level metrics such as CPU time, Heap usage, GC times and types, were captured and sent to a Yahoo in-house monitoring service for time-slicing and graphing. The graphs shown in the tests below use this service. See [0.3.0](https://github.com/yahoo/bullet-storm/releases/tag/bullet-storm-0.3.0) for details on how to plug in your own metrics collection. |
| 32 | + |
| 33 | +### Cluster |
| 34 | + |
| 35 | +* Running a custom build of Storm - Storm 0.10.2 with Storm 1.0+ features backported. For all intents and purposes, it's Storm 1.0.3 |
| 36 | +* Machines were of different configurations but all Storm workers for Bullet were running on: |
| 37 | + - 2 x Intel E5-2680v3 (12 core, 24 threads) - One reserved core gives each machine 47 cores from the Storm scheduler point of view |
| 38 | + - 256 GB RAM |
| 39 | + - 4 TB SATA Disk |
| 40 | + - 10 G Network Interface |
| 41 | +* Multi-tenant cluster with other topologies running. Average utilizations ranging from *70% - 90%* |
| 42 | + |
| 43 | +### Data |
| 44 | + |
| 45 | +* Reading from a Kafka 0.9.0.1 topic partitioned into 64 partitions |
| 46 | +* The Kafka cluster was located within the same datacenter as the Storm cluster - close network proximity gives us some measure of confidence that large data transmission delays aren't a factor. |
| 47 | +* The metrics for the Kafka topic for the month before testing: |
| 48 | + - The data records or messages per second (mps) for the topic was 43,000 mps to 207,000 mps with an average of 172,000 mps |
| 49 | + - The compressed data output of the topic was 22 MB/s to 128 MB/s with an average of 78 MB/s. Compressed data is roughly 80% the size of the uncompressed data |
| 50 | + |
| 51 | +Since there is a lot of variance, for each of the tests below, the data volume at that time will be provided in this format: ```Data: XX MPS and XX MB/s``` , where each of the numbers are the average for each metric over the last hour of when the test was done. |
| 52 | + |
| 53 | +### Configuration |
| 54 | + |
| 55 | +Here is the default configuration we used to launch the basic instance of Bullet on Storm: |
| 56 | + |
| 57 | +```YAML |
| 58 | +bullet.topology.metrics.enable: true |
| 59 | +bullet.topology.metrics.built.in.enable: true |
| 60 | +bullet.topology.metrics.built.in.emit.interval.mapping: |
| 61 | + bullet_active_rules: 5 |
| 62 | + default: 60 |
| 63 | +bullet.topology.metrics.classes: |
| 64 | + - "package.containing.our.custom.class.pushing.metrics" |
| 65 | +bullet.topology.drpc.spout.parallelism: 2 |
| 66 | +bullet.topology.drpc.spout.cpu.load: 20.0 |
| 67 | +bullet.topology.drpc.spout.memory.on.heap.load: 128.0 |
| 68 | +bullet.topology.drpc.spout.memory.off.heap.load: 192.0 |
| 69 | +bullet.topology.prepare.bolt.parallelism: 1 |
| 70 | +bullet.topology.prepare.bolt.cpu.load: 20.0 |
| 71 | +bullet.topology.prepare.bolt.memory.on.heap.load: 128.0 |
| 72 | +bullet.topology.prepare.bolt.memory.off.heap.load: 192.0 |
| 73 | +bullet.topology.filter.bolt.parallelism: 128 |
| 74 | +bullet.topology.filter.bolt.cpu.load: 100.0 |
| 75 | +bullet.topology.filter.bolt.memory.on.heap.load: 1024.0 |
| 76 | +bullet.topology.filter.bolt.memory.off.heap.load: 192.0 |
| 77 | +bullet.topology.return.bolt.parallelism: 1 |
| 78 | +bullet.topology.return.bolt.cpu.load: 20.0 |
| 79 | +bullet.topology.return.bolt.memory.on.heap.load: 128.0 |
| 80 | +bullet.topology.return.bolt.memory.off.heap.load: 192.0 |
| 81 | +bullet.topology.join.bolt.parallelism: 2 |
| 82 | +bullet.topology.join.bolt.cpu.load: 50.0 |
| 83 | +bullet.topology.join.bolt.memory.on.heap.load: 512.0 |
| 84 | +bullet.topology.join.bolt.memory.off.heap.load: 192.0 |
| 85 | +bullet.topology.join.bolt.error.tick.timeout: 3 |
| 86 | +bullet.topology.join.bolt.rule.tick.timeout: 3 |
| 87 | +bullet.topology.tick.interval.secs: 1 |
| 88 | +bullet.rule.default.duration: 30000 |
| 89 | +bullet.rule.max.duration: 540000 |
| 90 | +bullet.rule.aggregation.max.size: 512 |
| 91 | +bullet.rule.aggregation.raw.max.size: 500 |
| 92 | +``` |
| 93 | +Any setting not listed here default to the defaults in [bullet_defaults.yaml](https://github.com/yahoo/bullet-storm/blob/bullet-storm-0.3.0/src/main/resources/bullet_defaults.yaml). In particular, **metadata collection** and **timestamp injection** is enabled. ```RAW``` type queries also micro-batch by size 1 (in other words, do not micro-batch). |
| 94 | + |
| 95 | +The topology was also launched (command-line args to Storm) with the following Storm settings: |
| 96 | + |
| 97 | +```bash |
| 98 | +storm jar |
| 99 | + ... |
| 100 | + --bullet-spout-parallelism 64 |
| 101 | + --bullet-spout-cpu-load 100.0 \ |
| 102 | + --bullet-spout-on-heap-memory-load 1536.0 \ |
| 103 | + --bullet-spout-off-heap-memory-load 192.0 \ |
| 104 | + -c topology.acker.executors=256 \ |
| 105 | + -c topology.max.spout.pending=20000 \ |
| 106 | + -c topology.backpressure.enable=false \ |
| 107 | + -c topology.worker.max.heap.size.mb=4096.0 \ |
| 108 | + -c topology.worker.gc.childopts="-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -XX:NewRatio=1" \ |
| 109 | + ... |
| 110 | +``` |
| 111 | + |
| 112 | +1. The spout parallelism is 64 because it is going to read from a Kafka topic with 64 partitions (any more is meaningless since it cannot be split further). It reads and converts the data into Bullet Records. |
| 113 | +2. We've fanned out from the spouts to the Filter bolts by a ratio of 2. We will see if we actually need this ratio below. |
| 114 | +3. We use ```topology.max.spout.pending=20000``` to limit the number of in-flight tuples there can be from a DataSource Spout instance and throttle it if too many queries are slowing down processing downstream. This is set pretty high to account for catch-up and skew in our Kafka partitions |
| 115 | +4. We have set the max heap size for a worker to ```4 GiB``` since we do not want too large of a worker. If a component dies or causes a worker to be killed by RAS, it will not affect too many other workers. It also makes heap dumps etc manageable. |
| 116 | +5. We set ```topology.worker.gc.childopts``` to use ```ParNewGC``` and ```CMS```. These are our cluster defaults but we are listing them here since this may not be true for all Storm clusters. We have also added the ```-XX:NewRatio=1``` to the defaults since most of our objects are short-lived and having a larger Young Generation reduces our Young Generation GC (ParNew) frequency. |
| 117 | + |
| 118 | +### Resource utilization |
| 119 | + |
| 120 | +With these configurations, the total resource usage is: |
| 121 | + |
| 122 | +<div class="mostly-numeric-table"></div> |
| 123 | + |
| 124 | +| Component | Parallelism |CPU (cores) | Total Memory (MB) | |
| 125 | +| :------------------ | ----------: | ---------: | ----------------: | |
| 126 | +| DataSource Spout |64 |64 | 110592 | |
| 127 | +| Filter Bolt |128 |128 | 155648 | |
| 128 | +| Join Bolt |2 |1 | 1408 | |
| 129 | +| DRPC Spout |2 |0.4 | 640 | |
| 130 | +| PrepareRequest Bolt |1 |0.2 | 320 | |
| 131 | +| ReturnResults Bolt |1 |0.2 | 320 | |
| 132 | +| IMetricsConsumer |1 |0.1 | 256 | |
| 133 | +| Ackers |256 |25.6 | 32768 | |
| 134 | +| **Total** |**455** |**219.5** | **334720** | |
| 135 | + |
| 136 | +With these ~47 core machines, we would need ```5``` of these machines to run this instance of Bullet reading this data source and supporting a certain number of queries. What this certain number is, we will determine below. |
| 137 | + |
| 138 | +!!! note "How will this work for different data and query loads?" |
| 139 | + |
| 140 | + It won't. We will list changes to settings as we change data loads and queries. The settings listed above are the baseline. |
| 141 | + |
| 142 | + |
| 143 | +### Tools |
| 144 | + |
| 145 | + * [jq](https://stedolan.github.io/jq/) - a nice tool to parse Bullet JSON responses |
| 146 | + * curl, bash and python - for running and analyzing Bullet queries |
| 147 | + |
| 148 | +## Measuring the inherent latency of Bullet |
| 149 | + |
| 150 | +This test runs with the [standard configuration](#configuration) above. |
| 151 | + |
| 152 | +We are [running this query](../ws/examples.md#simplest-query) in this test. While not particularly a common query normal users would run, this ```RAW``` query without any filters will serve to measure the intrinsic delay added by Bullet. Since the data record pulled out has a timestamp for when the record was emitted into Kafka and Bullet will inject the timestamp into the record when the Filter bolt receives it and the metadata collection log timestamps for when the query was received and terminated, we can measure the end-to-end latency for getting one record through Bullet. We can also use the timestamp of query submission to determine what the end-to-end latency of getting a query through Bullet. |
| 153 | + |
| 154 | +The following table shows the timestamps averaged by running **100** of these queries. The delays below are shown *relative* to the Query Received timestamp (when the query was received by Bullet at the Join bolt). |
| 155 | + |
| 156 | +<div class="mostly-numeric-table"></div> |
| 157 | + |
| 158 | +| Timestamp | Delay (ms) | |
| 159 | +| :-------------- | ---------: | |
| 160 | +| Kafka Received | -705.79 | |
| 161 | +| Bullet Received | -1.01 | |
| 162 | +| Query Received | 0 | |
| 163 | +| Query Finished | 1.74 | |
| 164 | + |
| 165 | +This table shows that Bullet adds a delay of ```1.74 ms``` to just pull out a record. Note that the Bullet Received timestamp above is negative. This is because the Filter bolt received the query and emitted an arbitrary record ```1.01 ms``` before the Join bolt received the query. The data was submitted into Kafka about ```705.79 ms``` before the query was received by Bullet and that difference is the processing time of Kafka and the time for our spouts to read the data into Bullet. |
| 166 | + |
| 167 | +This result shows that this is the fastest Bullet can be. It cannot return data any faster than this for meaningful queries. |
| 168 | + |
| 169 | +## Measuring the time to find a record |
| 170 | + |
| 171 | +This test runs with the [standard configuration](#configuration) above. |
| 172 | + |
| 173 | +The [last test](#measuring-the-inherent-latency-of-bullet) attempted to measure how long Bullet takes to pick out a record. Here we will measure how long it takes to find a record *that we generate*. This is the average of running **100** queries across a time interval of 30 minutes trying to filter for a record with a single unique value in a field [similar to this query](../ws/examples.md#simple-filtering). |
| 174 | + |
| 175 | +Since this query actually requires us to be looking at the values in the data, we should also mention that the average data volume across this test was: ```Data: 164,000 MPS and 107 MB/s``` |
| 176 | + |
| 177 | +<div class="mostly-numeric-table"></div> |
| 178 | + |
| 179 | +| Timestamp | Delay (ms) | |
| 180 | +| :-------------- | ---------: | |
| 181 | +| Kafka Received | 519.25 | |
| 182 | +| Bullet Received | 1161.43 | |
| 183 | +| Query Received | 0 | |
| 184 | +| Query Finished | 1165.96 | |
| 185 | + |
| 186 | +Now, we're getting somewhere! We see that Bullet took on average ```1165.96 ms - 1161.43 ms = 4.53 ms``` from the time it saw the record in the Filter bolt to finishing up the query and returning it. Notice that the record was emitted into Kafka ```519.25 ms``` after the query was received. The delay is the time it takes for the generated record to flow through our network and into Kafka. |
| 187 | + |
| 188 | +It is difficult to isolate how long of the ```1161.43 ms - 519.25 ms = 642.18 ms``` was spent in Kafka and how long was spent reading the record and sending it to the Filter bolt from our spout. However, we can look this time as a whole and include it into the time to get data into Bullet. |
| 189 | + |
| 190 | +!!! note "So, Bullet takes ~5 ms to find a record?" |
| 191 | + |
| 192 | + No, not really. Remember that we are only including the time from which the record was matched in the Filter bolt to when it was sent out from the Join bolt. We can only conclude that the true delay is less than ```1165.96 ms - 519.25 ms = 646.71 ms``` because that is the difference in time from when the record was emitted into Kafka and when it was emitted out of Bullet. It is less than that because a part of that time is Kafka accepting the record and making it available for consumption. Nevertheless, finding a single record in data stream of ```164,000 mps``` in about half a second with about [5 machines](#resource-utilization) is not bad at all! |
| 193 | + |
| 194 | +# More coming soon! |
0 commit comments