Skip to content

Commit eb32401

Browse files
author
Guy Baron
authored
v1.1.5 rollup to master (#185)
1 parent 900f24f commit eb32401

File tree

21 files changed

+963
-23
lines changed

21 files changed

+963
-23
lines changed

README.md

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,37 +10,30 @@
1010

1111
A lightweight transactional message bus on top of RabbitMQ supporting:
1212

13-
1) Supported messaging semantics
14-
* One Way
15-
* Duplex
16-
* Publish/Subscribe
17-
* Request/Reply (RPC)
18-
2) Long running processes via the [Saga](https://github.com/wework/grabbit/blob/master/docs/SAGA.md) pattern
19-
3) Retry and backoffs
20-
4) Publisher confirms
21-
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
22-
6) Deadlettering
23-
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
24-
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
25-
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
26-
10) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with
27-
default support for gob, protobuf and avro
13+
14+
1) Supported [Messaging Styles](https://github.com/wework/grabbit/blob/master/docs/MESSAGING.md)
15+
- One Way (Fire and forget)
16+
- Publish/Subscribe
17+
- Aync Command/Reply
18+
- Blocking Command/Reply (RPC)
19+
2) [Transactional](https://github.com/wework/grabbit/blob/master/docs/TX.md) message processing
20+
3) Message Orchestration via the [Saga](https://github.com/wework/grabbit/blob/master/docs/SAGA.md) pattern
21+
4) At least once reliable messaging via [Transaction Outbox](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and [Publisher Confirms](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md)
22+
5) [Retry and backoffs](https://github.com/wework/grabbit/blob/master/docs/RETRY.md)
23+
6) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
24+
7) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
25+
8) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
26+
9) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with default support for gob, protobuf and avro
2827

2928
## Stable release
3029
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
3130

3231
## Supported transactional resources
3332
1) MySql > 8.0 (InnoDB)
34-
## Supported serializers
35-
1) gob
36-
2) Avro
37-
3) Protobuf
38-
39-
## Instrumentation
4033

41-
1) Opentracing
34+
## Basic Usage
4235

43-
## Usage
36+
- For a complete sample application see the vacation booking [sample app](https://github.com/wework/grabbit/blob/master/examples/vacation_app) in the examples directory
4437

4538
The following outlines the basic usage of grabbit.
4639
For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package

docs/MESSAGING.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Messaging
2+
3+
grabbit is all about asynchronous messaging and supports different kinds of interaction patterns.
4+
essentially to work with grabbit you will be registering on a grabbit bus instance handlers for specific types of messages or topics, allowing you to perform your business logic once a message is consumed by grabbit.
5+
Once invoked handlers can reply to incoming messages send or publish messages to other services via the bus.
6+
7+
At its core grabbit distinguishes between messages that target a specific service (commands) and
8+
messages that may target many services (events).
9+
10+
See [README.md](https://github.com/wework/grabbit/blob/master/README.md) or have a look at grabbit's [test suite](https://github.com/wework/grabbit/blob/master/tests/bus_test.go) to learn basic usage
11+
12+
### One Way
13+
14+
All messages in grabbit are essentially one-way messages sent to a RabbitMQ queue or exchange.
15+
More sophisticated complex interactions are all built on one way messaging.
16+
see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L29) as an example.
17+
18+
### Async Command/Reply
19+
20+
When a handlers replies to a command grabbit automatically adds the id of the inbound message to the outbound reply so services can correlate outbound messages with incoming ones.
21+
see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L91) as an example.
22+
23+
### Publish/Subscriber
24+
25+
grabbit allows publishing events to RabbitMQ exchange and topic hierarchies and handlers to subscribe to those events.
26+
Events do not get correlated and can not be replied to.
27+
28+
see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L112) as an example.
29+
30+
### Blocking Command/Reply (RPC)
31+
32+
it is sometimes beneficial to simulate blocking semantics over an async command/reply message exchange.
33+
In particular, it might come in handly when front end web applications need to call backend queued services.
34+
grabbit allows this scenario by providing the RPC API over the bus interface.
35+
36+
see the [following test](https://github.com/wework/grabbit/blob/master/tests/bus_test.go#L215) as an example.
37+

docs/RETRY.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Retry and Backoffs
2+
3+
When the invocation of a grabbit handler fails grabbit will retry the handler and perform a jittered [binary exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff#Binary_exponential_backoff_algorithm).
4+
5+
You can configure the number of retries and control the backoff time factor by passing in a gbus.BusConfiguration instance to the builder interface.
6+
7+
MaxRetryCount configures the maximum number of retries grabbit will try executing the handler before rejecting the message.
8+
9+
BaseRetryDuration is the base duration in milliseconds inputted into the backoff algorithm.
10+
With a binary exponential backoff algorithm, the time between each retry attempt is calculated as 2^[retry attempt] * BaseRetryDuration + [random jitter (a few nanoseconds) ]
11+
12+
the default MaxRetryCount is 3 and BaseRetryDuration is 10ms
13+
Given the above configuration, grabbit will try retrying according to the following
14+
15+
first retry after ~20ms
16+
second retry after ~40ms
17+
third retry after ~80ms
18+
19+
```go
20+
21+
package main
22+
23+
import (
24+
"github.com/wework/grabbit/gbus"
25+
"github.com/wework/grabbit/gbus/builder"
26+
)
27+
28+
bus := builder.
29+
New().
30+
Bus("rabbitmq connection string").
31+
WithLogger(logger).
32+
WorkerNum(3, 1).
33+
WithConfirms().
34+
WithConfiguration(gbus.BusConfiguration{
35+
MaxRetryCount: 5,
36+
BaseRetryDuration: 15,
37+
}).
38+
Txnl("mysql", "database connection string").
39+
Build("your service name")
40+
41+
```
42+

docs/TX.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Transactional Support
2+
3+
grabbit executes handlers within the scope of an active transaction.
4+
When handlers use the passed in transaction instance to persist their business objects
5+
grabbit guarantees local transactivity by bounding business objects persistence, outgoing command, reply and event messages the handler issues in a single transaction and routing messages to the [Transaction Outbox](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md).
6+
7+
The following demonstrates how to access the active transaction from within a handler
8+
9+
In this example, the updating of the orders table, publishing of the OrderCanceledEvent event and sending the OrderCanceledReply reply message are all bound to the same transaction.
10+
11+
12+
13+
```go
14+
15+
func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
16+
cancelOrderCommand := message.Payload.(CancelOrderCommand)
17+
if e := invocation.Tx().Exec("UPDATE orders SET status = 'Canceled' WHERE order_id=?", cancelOrderCommand.orderID); e != nil{
18+
return e
19+
}
20+
orderCanceledEvent := gbus.NewBusMessage(OrderCanceledEvent{
21+
OrderID: cancelOrderCommand.orderID,
22+
})
23+
if e := invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "order.canceled", orderCanceledEvent); e := nil{
24+
return e
25+
}
26+
27+
orderCanceledReply := gbus.NewBusMessage(OrderCanceledReply{})
28+
return invocation.Reply(invocation.Ctx(), orderCanceledReply)
29+
}
30+
31+
```

examples/vacation_app/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
vacation_app

examples/vacation_app/.gitingnore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
vacation_app

examples/vacation_app/README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
## grabbit example vacation app
2+
3+
The following example simulates a vacation booking app and demonstrates the use of grabbit
4+
to manage the booking process.
5+
6+
The app is made up out of the following components
7+
8+
- The client, a console app that you enter your vacation destination and calls the booking service to
9+
book the vacation.
10+
11+
- Booking Service manages the booking saga calling the hotels and flights services to book hotels and flights for the requested destination. it handles as well as applying flight cancelation in case the hotel service can not book a hotel.
12+
The booking saga sends back a message to the client with the result of the booking request once the saga completes.
13+
14+
- Flights Service, books flights to the requested destination and replies back the response.
15+
The flight service.
16+
17+
- Hotels Service, books hotels for the requested destination and replies back the response.
18+
In this example requesting to book a vacation in Oslo results in a reply message with a Failed status
19+
Triggering the booking saga to send a command to the flight service to cancel the booked flights.
20+
21+
## building and running the example
22+
23+
- go build
24+
- docker-compose up
25+
- run the booking servce: vacation_app booking
26+
- run the hotels servce: vacation_app hotels
27+
- run the flights servce: vacation_app flights
28+
- run the client: vacation_app client
29+
30+
Once the services are running you can enter a vacation destination in the client app which will invoke the booking saga which will orchestrate the flights and hotels services.
31+
32+
33+
You can see a trace of the calls by browsing to the Jeager client at http://localhost:16686
34+
35+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package cmd
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"os"
7+
"vacation_app/trace"
8+
9+
log "github.com/sirupsen/logrus"
10+
"github.com/spf13/cobra"
11+
"vacation_app/saga"
12+
)
13+
14+
func init() {
15+
rootCmd.AddCommand(runBookingServiceeCmd)
16+
}
17+
18+
var runBookingServiceeCmd = &cobra.Command{
19+
Use: "booking",
20+
Short: "Run the booking service",
21+
Run: func(cmd *cobra.Command, args []string) {
22+
svcName := "booking-service"
23+
closer, err := trace.CreatetJeagerTracer(svcName)
24+
25+
if err != nil {
26+
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
27+
return
28+
}
29+
30+
defer closer.Close()
31+
gb := createBus(svcName)
32+
33+
gb.RegisterSaga(&saga.BookingSaga{})
34+
gb.Start()
35+
defer gb.Shutdown()
36+
37+
fmt.Print("Booking service running...press any key to exit...\n")
38+
reader := bufio.NewReader(os.Stdin)
39+
reader.ReadString('\n')
40+
},
41+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package cmd
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"os"
8+
"strings"
9+
"vacation_app/messages"
10+
"vacation_app/trace"
11+
12+
log "github.com/sirupsen/logrus"
13+
"github.com/spf13/cobra"
14+
"github.com/wework/grabbit/gbus"
15+
)
16+
17+
18+
var runClientCmd = &cobra.Command{
19+
Use: "client",
20+
Short: "Run the client app",
21+
Run: func(cmd *cobra.Command, args []string) {
22+
fmt.Println("\033]0;Title goes here\007")
23+
log.SetFormatter(&log.TextFormatter{ForceColors: true})
24+
log.SetLevel(log.ErrorLevel)
25+
26+
svcName := "client"
27+
closer, err := trace.CreatetJeagerTracer(svcName)
28+
29+
if err != nil {
30+
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
31+
return
32+
}
33+
defer closer.Close()
34+
gb := createBus(svcName)
35+
36+
gb.HandleMessage(messages.BookingComplete{}, HandleBookingComplete)
37+
gb.Start()
38+
defer gb.Shutdown()
39+
40+
for {
41+
fmt.Print("Enter destination ...\n")
42+
reader := bufio.NewReader(os.Stdin)
43+
dest, _ := reader.ReadString('\n')
44+
dest = strings.TrimSpace(dest)
45+
bookVacationCmd := gbus.NewBusMessage(messages.BookVacationCmd{
46+
Destination: dest,
47+
})
48+
49+
gb.Send(context.Background(), "booking-service", bookVacationCmd)
50+
51+
fmt.Printf("booking vacation for destination %s\n", dest)
52+
53+
}
54+
},
55+
}
56+
57+
func HandleBookingComplete(invocation gbus.Invocation, message *gbus.BusMessage) error {
58+
bookingComplete := message.Payload.(*messages.BookingComplete)
59+
if bookingComplete.Success {
60+
fmt.Printf("booking completed succesfully\n")
61+
62+
} else {
63+
fmt.Printf("failed to book vacation\n")
64+
}
65+
return nil
66+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package cmd
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"os"
7+
"vacation_app/trace"
8+
"vacation_app/messages"
9+
10+
log "github.com/sirupsen/logrus"
11+
"github.com/spf13/cobra"
12+
"github.com/wework/grabbit/gbus"
13+
)
14+
15+
16+
17+
var runFlightsgServiceCmd = &cobra.Command{
18+
Use: "flights",
19+
Short: "Run the flights service",
20+
Run: func(cmd *cobra.Command, args []string) {
21+
svcName := "flights-service"
22+
closer, err := trace.CreatetJeagerTracer(svcName)
23+
24+
if err != nil {
25+
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
26+
return
27+
}
28+
29+
defer closer.Close()
30+
gb := createBus(svcName)
31+
32+
gb.HandleMessage(messages.BookFlightsCmd{}, HandleBookFlightCommand)
33+
gb.HandleMessage(messages.CancelFlightsCmd{}, HandleCancelFlightCommand)
34+
35+
gb.Start()
36+
defer gb.Shutdown()
37+
38+
fmt.Print("Flights service running...press any key to exit...\n")
39+
reader := bufio.NewReader(os.Stdin)
40+
reader.ReadString('\n')
41+
},
42+
}
43+
44+
func HandleBookFlightCommand(invocation gbus.Invocation, message *gbus.BusMessage) error {
45+
cmd := message.Payload.(*messages.BookFlightsCmd)
46+
invocation.Log().Infof("booking flight to %s", cmd.Destination)
47+
reply := gbus.NewBusMessage(messages.BookFlightsRsp{
48+
Success: true,
49+
})
50+
invocation.Reply(invocation.Ctx(), reply)
51+
52+
return nil
53+
}
54+
55+
func HandleCancelFlightCommand(invocation gbus.Invocation, message *gbus.BusMessage) error {
56+
cmd := message.Payload.(*messages.CancelFlightsCmd)
57+
invocation.Log().Infof("canceling flight to %s", cmd.Destination)
58+
reply := gbus.NewBusMessage(messages.CancelFlightsRsp{
59+
Success: true,
60+
})
61+
invocation.Reply(invocation.Ctx(), reply)
62+
63+
return nil
64+
}

0 commit comments

Comments
 (0)