Skip to content

Commit 18475c2

Browse files
authored
Add ability to inject *redis.Client into producer & consumer (robinjoseph08#9)
1 parent 13b26dc commit 18475c2

File tree

6 files changed

+73
-28
lines changed

6 files changed

+73
-28
lines changed

consumer.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,13 @@ type ConsumerOptions struct {
5252
BufferSize int
5353
// Concurrency dictates how many goroutines to spawn to handle the messages.
5454
Concurrency int
55-
// RedisOptions is how you configure the underlying Redis connection. More
56-
// info here: https://godoc.org/github.com/go-redis/redis#Options.
55+
// RedisClient supersedes the RedisOptions field, and allows you to inject
56+
// an already-made *redis.Client for use in the consumer.
57+
RedisClient *redis.Client
58+
// RedisOptions allows you to configure the underlying Redis connection.
59+
// More info here: https://godoc.org/github.com/go-redis/redis#Options.
60+
//
61+
// This field is used if RedisClient field is nil.
5762
RedisOptions *RedisOptions
5863
}
5964

@@ -115,9 +120,16 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
115120
options.ReclaimInterval = 1 * time.Second
116121
}
117122

118-
r, err := newRedisClient(options.RedisOptions)
119-
if err != nil {
120-
return nil, errors.Wrap(err, "error creating redis client")
123+
var r *redis.Client
124+
125+
if options.RedisClient != nil {
126+
r = options.RedisClient
127+
} else {
128+
r = newRedisClient(options.RedisOptions)
129+
}
130+
131+
if err := redisPreflightChecks(r); err != nil {
132+
return nil, err
121133
}
122134

123135
return &Consumer{

consumer_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,19 @@ func TestNewConsumerWithOptions(t *testing.T) {
4141
assert.Equal(tt, 1*time.Second, c.options.ReclaimInterval)
4242
})
4343

44-
t.Run("allows override of Name, GroupName, BlockingTimeout, and ReclaimTimeout", func(tt *testing.T) {
44+
t.Run("allows override of Name, GroupName, BlockingTimeout, ReclaimTimeout, and RedisClient", func(tt *testing.T) {
45+
rc := newRedisClient(nil)
46+
4547
c, err := NewConsumerWithOptions(&ConsumerOptions{
4648
Name: "test_name",
4749
GroupName: "test_group_name",
4850
BlockingTimeout: 10 * time.Second,
4951
ReclaimInterval: 10 * time.Second,
52+
RedisClient: rc,
5053
})
5154
require.NoError(tt, err)
5255

56+
assert.Equal(tt, rc, c.redis)
5357
assert.Equal(tt, "test_name", c.options.Name)
5458
assert.Equal(tt, "test_group_name", c.options.GroupName)
5559
assert.Equal(tt, 10*time.Second, c.options.BlockingTimeout)

producer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ type ProducerOptions struct {
1919
// option. This allows the stream trimming to done in a more efficient
2020
// manner. More info here: https://redis.io/commands/xadd#capped-streams.
2121
ApproximateMaxLength bool
22-
// RedisOptions is how you configure the underlying Redis connection. More
23-
// info here: https://godoc.org/github.com/go-redis/redis#Options.
22+
// RedisClient supersedes the RedisOptions field, and allows you to inject
23+
// an already-made *redis.Client for use in the consumer.
24+
RedisClient *redis.Client
25+
// RedisOptions allows you to configure the underlying Redis connection.
26+
// More info here: https://godoc.org/github.com/go-redis/redis#Options.
27+
//
28+
// This field is used if RedisClient field is nil.
2429
RedisOptions *RedisOptions
2530
}
2631

@@ -45,8 +50,15 @@ func NewProducer() (*Producer, error) {
4550

4651
// NewProducerWithOptions creates a Producer using custom ProducerOptions.
4752
func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) {
48-
r, err := newRedisClient(options.RedisOptions)
49-
if err != nil {
53+
var r *redis.Client
54+
55+
if options.RedisClient != nil {
56+
r = options.RedisClient
57+
} else {
58+
r = newRedisClient(options.RedisOptions)
59+
}
60+
61+
if err := redisPreflightChecks(r); err != nil {
5062
return nil, err
5163
}
5264

producer_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ func TestNewProducerWithOptions(t *testing.T) {
2424
assert.NotNil(tt, p)
2525
})
2626

27+
t.Run("allows custom *redis.Client", func(tt *testing.T) {
28+
rc := newRedisClient(nil)
29+
30+
p, err := NewProducerWithOptions(&ProducerOptions{
31+
RedisClient: rc,
32+
})
33+
require.NoError(tt, err)
34+
35+
assert.NotNil(tt, p)
36+
assert.Equal(tt, rc, p.redis)
37+
})
38+
2739
t.Run("bubbles up errors", func(tt *testing.T) {
2840
_, err := NewProducerWithOptions(&ProducerOptions{
2941
RedisOptions: &RedisOptions{Addr: "localhost:0"},

redis.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,36 +17,39 @@ var redisVersionRE = regexp.MustCompile(`redis_version:(.+)`)
1717
type RedisOptions = redis.Options
1818

1919
// newRedisClient creates a new Redis client with the given options. If options
20-
// is nil, it will use default options. In addition to creating the client, it
21-
// also ensures that it can connect to the actual instance and that the instance
22-
// supports Redis streams (i.e. it's at least v5).
23-
func newRedisClient(options *RedisOptions) (*redis.Client, error) {
20+
// is nil, it will use default options.
21+
func newRedisClient(options *RedisOptions) *redis.Client {
2422
if options == nil {
2523
options = &RedisOptions{}
2624
}
27-
client := redis.NewClient(options)
25+
return redis.NewClient(options)
26+
}
2827

29-
// make sure Redis supports streams (i.e. is at least v5)
28+
// redisPreflightChecks makes sure the Redis instance backing the *redis.Client
29+
// offers the functionality we need. Specifically, it also that it can connect
30+
// to the actual instance and that the instance supports Redis streams (i.e.
31+
// it's at least v5).
32+
func redisPreflightChecks(client *redis.Client) error {
3033
info, err := client.Info("server").Result()
3134
if err != nil {
32-
return nil, err
35+
return err
3336
}
3437

3538
match := redisVersionRE.FindAllStringSubmatch(info, -1)
3639
if len(match) < 1 {
37-
return nil, fmt.Errorf("could not extract redis version")
40+
return fmt.Errorf("could not extract redis version")
3841
}
3942
version := strings.TrimSpace(match[0][1])
4043
parts := strings.Split(version, ".")
4144
major, err := strconv.Atoi(parts[0])
4245
if err != nil {
43-
return nil, err
46+
return err
4447
}
4548
if major < 5 {
46-
return nil, fmt.Errorf("redis streams are not supported in version %q", version)
49+
return fmt.Errorf("redis streams are not supported in version %q", version)
4750
}
4851

49-
return client, nil
52+
return nil
5053
}
5154

5255
// incrementMessageID takes in a message ID (e.g. 1564886140363-0) and

redis_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,26 @@ import (
1010
func TestNewRedisClient(t *testing.T) {
1111
t.Run("returns a new redis client", func(tt *testing.T) {
1212
options := &RedisOptions{}
13-
r, err := newRedisClient(options)
14-
require.NoError(tt, err)
13+
r := newRedisClient(options)
1514

16-
err = r.Ping().Err()
15+
err := r.Ping().Err()
1716
assert.NoError(tt, err)
1817
})
1918

2019
t.Run("defaults options if it's nil", func(tt *testing.T) {
21-
r, err := newRedisClient(nil)
22-
require.NoError(tt, err)
20+
r := newRedisClient(nil)
2321

24-
err = r.Ping().Err()
22+
err := r.Ping().Err()
2523
assert.NoError(tt, err)
2624
})
25+
}
2726

27+
func TestRedisPreflightChecks(t *testing.T) {
2828
t.Run("bubbles up errors", func(tt *testing.T) {
2929
options := &RedisOptions{Addr: "localhost:0"}
30-
_, err := newRedisClient(options)
30+
r := newRedisClient(options)
31+
32+
err := redisPreflightChecks(r)
3133
require.Error(tt, err)
3234

3335
assert.Contains(tt, err.Error(), "dial tcp")

0 commit comments

Comments
 (0)