File: //proc/self/root/opt/go/pkg/mod/github.com/hashicorp/
[email protected]/queue_test.go
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package memberlist
import (
"testing"
"github.com/google/btree"
"github.com/stretchr/testify/require"
)
func TestLimitedBroadcastLess(t *testing.T) {
cases := []struct {
Name string
A *limitedBroadcast // lesser
B *limitedBroadcast
}{
{
"diff-transmits",
&limitedBroadcast{transmits: 0, msgLen: 10, id: 100},
&limitedBroadcast{transmits: 1, msgLen: 10, id: 100},
},
{
"same-transmits--diff-len",
&limitedBroadcast{transmits: 0, msgLen: 12, id: 100},
&limitedBroadcast{transmits: 0, msgLen: 10, id: 100},
},
{
"same-transmits--same-len--diff-id",
&limitedBroadcast{transmits: 0, msgLen: 12, id: 100},
&limitedBroadcast{transmits: 0, msgLen: 12, id: 90},
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
a, b := c.A, c.B
require.True(t, a.Less(b))
tree := btree.New(32)
tree.ReplaceOrInsert(b)
tree.ReplaceOrInsert(a)
min := tree.Min().(*limitedBroadcast)
require.Equal(t, a.transmits, min.transmits)
require.Equal(t, a.msgLen, min.msgLen)
require.Equal(t, a.id, min.id)
max := tree.Max().(*limitedBroadcast)
require.Equal(t, b.transmits, max.transmits)
require.Equal(t, b.msgLen, max.msgLen)
require.Equal(t, b.id, max.id)
})
}
}
func TestTransmitLimited_Queue(t *testing.T) {
q := &TransmitLimitedQueue{RetransmitMult: 1, NumNodes: func() int { return 1 }}
q.QueueBroadcast(&memberlistBroadcast{"test", nil, nil})
q.QueueBroadcast(&memberlistBroadcast{"foo", nil, nil})
q.QueueBroadcast(&memberlistBroadcast{"bar", nil, nil})
if q.NumQueued() != 3 {
t.Fatalf("bad len")
}
dump := q.orderedView(true)
if dump[0].b.(*memberlistBroadcast).node != "test" {
t.Fatalf("missing test")
}
if dump[1].b.(*memberlistBroadcast).node != "foo" {
t.Fatalf("missing foo")
}
if dump[2].b.(*memberlistBroadcast).node != "bar" {
t.Fatalf("missing bar")
}
// Should invalidate previous message
q.QueueBroadcast(&memberlistBroadcast{"test", nil, nil})
if q.NumQueued() != 3 {
t.Fatalf("bad len")
}
dump = q.orderedView(true)
if dump[0].b.(*memberlistBroadcast).node != "foo" {
t.Fatalf("missing foo")
}
if dump[1].b.(*memberlistBroadcast).node != "bar" {
t.Fatalf("missing bar")
}
if dump[2].b.(*memberlistBroadcast).node != "test" {
t.Fatalf("missing test")
}
}
func TestTransmitLimited_GetBroadcasts(t *testing.T) {
q := &TransmitLimitedQueue{RetransmitMult: 3, NumNodes: func() int { return 10 }}
// 18 bytes per message
q.QueueBroadcast(&memberlistBroadcast{"test", []byte("1. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"foo", []byte("2. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"bar", []byte("3. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"baz", []byte("4. this is a test."), nil})
// 2 byte overhead per message, should get all 4 messages
all := q.GetBroadcasts(2, 80)
require.Equal(t, 4, len(all), "missing messages: %v", prettyPrintMessages(all))
// 3 byte overhead, should only get 3 messages back
partial := q.GetBroadcasts(3, 80)
require.Equal(t, 3, len(partial), "missing messages: %v", prettyPrintMessages(partial))
}
func TestTransmitLimited_GetBroadcasts_Limit(t *testing.T) {
q := &TransmitLimitedQueue{RetransmitMult: 1, NumNodes: func() int { return 10 }}
require.Equal(t, int64(0), q.idGen, "the id generator seed starts at zero")
require.Equal(t, 2, retransmitLimit(q.RetransmitMult, q.NumNodes()), "sanity check transmit limits")
// 18 bytes per message
q.QueueBroadcast(&memberlistBroadcast{"test", []byte("1. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"foo", []byte("2. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"bar", []byte("3. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"baz", []byte("4. this is a test."), nil})
require.Equal(t, int64(4), q.idGen, "we handed out 4 IDs")
// 3 byte overhead, should only get 3 messages back
partial1 := q.GetBroadcasts(3, 80)
require.Equal(t, 3, len(partial1), "missing messages: %v", prettyPrintMessages(partial1))
require.Equal(t, int64(4), q.idGen, "id generator doesn't reset until empty")
partial2 := q.GetBroadcasts(3, 80)
require.Equal(t, 3, len(partial2), "missing messages: %v", prettyPrintMessages(partial2))
require.Equal(t, int64(4), q.idGen, "id generator doesn't reset until empty")
// Only two not expired
partial3 := q.GetBroadcasts(3, 80)
require.Equal(t, 2, len(partial3), "missing messages: %v", prettyPrintMessages(partial3))
require.Equal(t, int64(0), q.idGen, "id generator resets on empty")
// Should get nothing
partial5 := q.GetBroadcasts(3, 80)
require.Equal(t, 0, len(partial5), "missing messages: %v", prettyPrintMessages(partial5))
require.Equal(t, int64(0), q.idGen, "id generator resets on empty")
}
func prettyPrintMessages(msgs [][]byte) []string {
var out []string
for _, msg := range msgs {
out = append(out, "'"+string(msg)+"'")
}
return out
}
func TestTransmitLimited_Prune(t *testing.T) {
q := &TransmitLimitedQueue{RetransmitMult: 1, NumNodes: func() int { return 10 }}
ch1 := make(chan struct{}, 1)
ch2 := make(chan struct{}, 1)
// 18 bytes per message
q.QueueBroadcast(&memberlistBroadcast{"test", []byte("1. this is a test."), ch1})
q.QueueBroadcast(&memberlistBroadcast{"foo", []byte("2. this is a test."), ch2})
q.QueueBroadcast(&memberlistBroadcast{"bar", []byte("3. this is a test."), nil})
q.QueueBroadcast(&memberlistBroadcast{"baz", []byte("4. this is a test."), nil})
// Keep only 2
q.Prune(2)
require.Equal(t, 2, q.NumQueued())
// Should notify the first two
select {
case <-ch1:
default:
t.Fatalf("expected invalidation")
}
select {
case <-ch2:
default:
t.Fatalf("expected invalidation")
}
dump := q.orderedView(true)
if dump[0].b.(*memberlistBroadcast).node != "bar" {
t.Fatalf("missing bar")
}
if dump[1].b.(*memberlistBroadcast).node != "baz" {
t.Fatalf("missing baz")
}
}
func TestTransmitLimited_ordering(t *testing.T) {
q := &TransmitLimitedQueue{RetransmitMult: 1, NumNodes: func() int { return 10 }}
insert := func(name string, transmits int) {
q.queueBroadcast(&memberlistBroadcast{name, []byte(name), make(chan struct{})}, transmits)
}
insert("node0", 0)
insert("node1", 10)
insert("node2", 3)
insert("node3", 4)
insert("node4", 7)
dump := q.orderedView(true)
if dump[0].transmits != 10 {
t.Fatalf("bad val %v, %d", dump[0].b.(*memberlistBroadcast).node, dump[0].transmits)
}
if dump[1].transmits != 7 {
t.Fatalf("bad val %v, %d", dump[7].b.(*memberlistBroadcast).node, dump[7].transmits)
}
if dump[2].transmits != 4 {
t.Fatalf("bad val %v, %d", dump[2].b.(*memberlistBroadcast).node, dump[2].transmits)
}
if dump[3].transmits != 3 {
t.Fatalf("bad val %v, %d", dump[3].b.(*memberlistBroadcast).node, dump[3].transmits)
}
if dump[4].transmits != 0 {
t.Fatalf("bad val %v, %d", dump[4].b.(*memberlistBroadcast).node, dump[4].transmits)
}
}