From 39b34508a9bcb80e177bde14a8265fec3982abaa Mon Sep 17 00:00:00 2001 From: Martin Angers Date: Wed, 26 Jan 2022 08:13:01 -0500 Subject: [PATCH] Try to fix flaky publisher-has-listener redis test (#3876) --- server/datastore/redis/redis_external_test.go | 75 +++++++++++-------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/server/datastore/redis/redis_external_test.go b/server/datastore/redis/redis_external_test.go index 6473603f66..c12c3af274 100644 --- a/server/datastore/redis/redis_external_test.go +++ b/server/datastore/redis/redis_external_test.go @@ -153,6 +153,23 @@ func TestBindConn(t *testing.T) { func TestPublishHasListeners(t *testing.T) { const prefix = "TestPublishHasListeners:" + const defaultTimeout = 5 * time.Second + + waitForSub := func(t *testing.T, psc redigo.PubSubConn, timeout time.Duration) { + start := time.Now() + var loopOk bool + loop: + for time.Since(start) < timeout { + msg := psc.ReceiveWithTimeout(time.Second) + switch msg := msg.(type) { + case redigo.Subscription: + require.Equal(t, msg.Count, 1) + loopOk = true + break loop + } + } + require.True(t, loopOk, "timed out") + } t.Run("standalone", func(t *testing.T) { pool := redistest.SetupRedis(t, prefix, false, false, false) @@ -168,35 +185,22 @@ func TestPublishHasListeners(t *testing.T) { psc := redigo.PubSubConn{Conn: sconn} require.NoError(t, psc.Subscribe(prefix+"a")) - - start := time.Now() - var loopOk bool - loop1: - for time.Since(start) < 2*time.Second { - msg := psc.Receive() - switch msg := msg.(type) { - case redigo.Subscription: - require.Equal(t, msg.Count, 1) - loopOk = true - break loop1 - } - } - require.True(t, loopOk, "timed out") + waitForSub(t, psc, defaultTimeout) ok, err = redis.PublishHasListeners(pool, pconn, prefix+"a", "B") require.NoError(t, err) require.True(t, ok) - start = time.Now() - loopOk = false - loop2: - for time.Since(start) < 2*time.Second { - msg := psc.Receive() + start := time.Now() + loopOk := false + loop: + for time.Since(start) < defaultTimeout { + msg := psc.ReceiveWithTimeout(time.Second) switch msg := msg.(type) { case redigo.Message: require.Equal(t, "B", string(msg.Data)) loopOk = true - break loop2 + break loop } } require.True(t, loopOk, "timed out") @@ -218,31 +222,42 @@ func TestPublishHasListeners(t *testing.T) { redis.BindConn(pool, sconn, "b") psc := redigo.PubSubConn{Conn: sconn} require.NoError(t, psc.Subscribe(prefix+"{a}")) + waitForSub(t, psc, defaultTimeout) - // a standard PUBLISH returns 0 + // a standard PUBLISH returns 0, because there are no subscribers *on this + // particular node* n, err := redigo.Int(pconn.Do("PUBLISH", prefix+"{a}", "B")) require.NoError(t, err) require.Equal(t, 0, n) - // but this returns true + // but PublishHasListeners returns true, as it checks for subscribers on + // each node in the cluster ok, err = redis.PublishHasListeners(pool, pconn, prefix+"{a}", "C") require.NoError(t, err) require.True(t, ok) + // wait to receive the messages - note that both B and C will be received, + // but order is not guaranteed as the publishing was done on a distinct + // node and the cluster protocol forwards them on every node in the + // cluster. start := time.Now() - want := "B" + want := map[string]bool{"B": false, "C": false} var loopOk bool loop: - for time.Since(start) < 2*time.Second { - msg := psc.Receive() + for time.Since(start) < defaultTimeout { + msg := psc.ReceiveWithTimeout(time.Second) switch msg := msg.(type) { case redigo.Message: - require.Equal(t, want, string(msg.Data)) - if want == "C" { - loopOk = true - break loop + _, ok := want[string(msg.Data)] + require.True(t, ok) // must be one of the expected messages + want[string(msg.Data)] = true + for _, v := range want { + if !v { + continue loop + } } - want = "C" + loopOk = true + break loop } } require.True(t, loopOk, "timed out")