Awesome Open Source
Awesome Open Source

Rabbus 🚌 ✨

  • A tiny wrapper over amqp exchanges and queues.
  • In memory retries with exponential backoff for sending messages.
  • Protect producer calls with circuit breaker.
  • Automatic reconnect to RabbitMQ broker when connection is lost.
  • Go channel API.

Installation

go get -u github.com/rafaeljesus/rabbus

Usage

The rabbus package exposes an interface for emitting and listening RabbitMQ messages.

Emit

import (
	"context"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// do something when state is changed
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// handle error
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// handle error
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	msg := rabbus.Message{
		Exchange: "test_ex",
		Kind:     "topic",
		Key:      "test_key",
		Payload:  []byte(`foo`),
	}

	r.EmitAsync() <- msg

	for {
		select {
		case <-r.EmitOk():
			// message was sent
		case <-r.EmitErr():
			// failed to send message
		case <-timeout:
			// handle timeout error
		}
	}
}

Listen

import (
	"context"
	"encoding/json"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// do something when state is changed
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// handle error
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// handle error
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	messages, err := r.Listen(rabbus.ListenConfig{
		Exchange:    "events_ex",
		Kind:        "topic",
		Key:         "events_key",
		Queue:       "events_q",
		DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"),
		BindArgs:    rabbus.NewBindArgs().With("baz", "qux"),
	})
	if err != nil {
		// handle errors during adding listener
	}
	defer close(messages)

	go func(messages chan ConsumerMessage) {
		for m := range messages {
			m.Ack(false)
		}
	}(messages)
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael

Related Awesome Lists
Top Programming Languages
Top Projects

Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
Golang (155,873
Microservices (11,956
Rabbitmq (6,227
Amqp (1,978
Event Bus (1,514
Event Driven (709
Backoff (540
Circuit Breaker (407
Resilience (360