mirror of
https://github.com/fleetdm/fleet
synced 2026-05-23 08:58:41 +00:00
Try to fix flaky publisher-has-listener redis test (#3876)
This commit is contained in:
parent
7895636335
commit
39b34508a9
1 changed files with 45 additions and 30 deletions
|
|
@ -153,6 +153,23 @@ func TestBindConn(t *testing.T) {
|
|||
|
||||
func TestPublishHasListeners(t *testing.T) {
|
||||
const prefix = "TestPublishHasListeners:"
|
||||
const defaultTimeout = 5 * time.Second
|
||||
|
||||
waitForSub := func(t *testing.T, psc redigo.PubSubConn, timeout time.Duration) {
|
||||
start := time.Now()
|
||||
var loopOk bool
|
||||
loop:
|
||||
for time.Since(start) < timeout {
|
||||
msg := psc.ReceiveWithTimeout(time.Second)
|
||||
switch msg := msg.(type) {
|
||||
case redigo.Subscription:
|
||||
require.Equal(t, msg.Count, 1)
|
||||
loopOk = true
|
||||
break loop
|
||||
}
|
||||
}
|
||||
require.True(t, loopOk, "timed out")
|
||||
}
|
||||
|
||||
t.Run("standalone", func(t *testing.T) {
|
||||
pool := redistest.SetupRedis(t, prefix, false, false, false)
|
||||
|
|
@ -168,35 +185,22 @@ func TestPublishHasListeners(t *testing.T) {
|
|||
|
||||
psc := redigo.PubSubConn{Conn: sconn}
|
||||
require.NoError(t, psc.Subscribe(prefix+"a"))
|
||||
|
||||
start := time.Now()
|
||||
var loopOk bool
|
||||
loop1:
|
||||
for time.Since(start) < 2*time.Second {
|
||||
msg := psc.Receive()
|
||||
switch msg := msg.(type) {
|
||||
case redigo.Subscription:
|
||||
require.Equal(t, msg.Count, 1)
|
||||
loopOk = true
|
||||
break loop1
|
||||
}
|
||||
}
|
||||
require.True(t, loopOk, "timed out")
|
||||
waitForSub(t, psc, defaultTimeout)
|
||||
|
||||
ok, err = redis.PublishHasListeners(pool, pconn, prefix+"a", "B")
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
|
||||
start = time.Now()
|
||||
loopOk = false
|
||||
loop2:
|
||||
for time.Since(start) < 2*time.Second {
|
||||
msg := psc.Receive()
|
||||
start := time.Now()
|
||||
loopOk := false
|
||||
loop:
|
||||
for time.Since(start) < defaultTimeout {
|
||||
msg := psc.ReceiveWithTimeout(time.Second)
|
||||
switch msg := msg.(type) {
|
||||
case redigo.Message:
|
||||
require.Equal(t, "B", string(msg.Data))
|
||||
loopOk = true
|
||||
break loop2
|
||||
break loop
|
||||
}
|
||||
}
|
||||
require.True(t, loopOk, "timed out")
|
||||
|
|
@ -218,31 +222,42 @@ func TestPublishHasListeners(t *testing.T) {
|
|||
redis.BindConn(pool, sconn, "b")
|
||||
psc := redigo.PubSubConn{Conn: sconn}
|
||||
require.NoError(t, psc.Subscribe(prefix+"{a}"))
|
||||
waitForSub(t, psc, defaultTimeout)
|
||||
|
||||
// a standard PUBLISH returns 0
|
||||
// a standard PUBLISH returns 0, because there are no subscribers *on this
|
||||
// particular node*
|
||||
n, err := redigo.Int(pconn.Do("PUBLISH", prefix+"{a}", "B"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, n)
|
||||
|
||||
// but this returns true
|
||||
// but PublishHasListeners returns true, as it checks for subscribers on
|
||||
// each node in the cluster
|
||||
ok, err = redis.PublishHasListeners(pool, pconn, prefix+"{a}", "C")
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
|
||||
// wait to receive the messages - note that both B and C will be received,
|
||||
// but order is not guaranteed as the publishing was done on a distinct
|
||||
// node and the cluster protocol forwards them on every node in the
|
||||
// cluster.
|
||||
start := time.Now()
|
||||
want := "B"
|
||||
want := map[string]bool{"B": false, "C": false}
|
||||
var loopOk bool
|
||||
loop:
|
||||
for time.Since(start) < 2*time.Second {
|
||||
msg := psc.Receive()
|
||||
for time.Since(start) < defaultTimeout {
|
||||
msg := psc.ReceiveWithTimeout(time.Second)
|
||||
switch msg := msg.(type) {
|
||||
case redigo.Message:
|
||||
require.Equal(t, want, string(msg.Data))
|
||||
if want == "C" {
|
||||
loopOk = true
|
||||
break loop
|
||||
_, ok := want[string(msg.Data)]
|
||||
require.True(t, ok) // must be one of the expected messages
|
||||
want[string(msg.Data)] = true
|
||||
for _, v := range want {
|
||||
if !v {
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
want = "C"
|
||||
loopOk = true
|
||||
break loop
|
||||
}
|
||||
}
|
||||
require.True(t, loopOk, "timed out")
|
||||
|
|
|
|||
Loading…
Reference in a new issue