File: //proc/self/root/opt/go/pkg/mod/github.com/hashicorp/
[email protected]/state_test.go
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package memberlist
import (
"bytes"
"fmt"
"log"
"net"
"os"
"reflect"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/armon/go-metrics"
iretry "github.com/hashicorp/memberlist/internal/retry"
"github.com/stretchr/testify/require"
)
func HostMemberlist(host string, t *testing.T, f func(*Config)) *Memberlist {
t.Helper()
c := DefaultLANConfig()
c.Name = host
c.BindAddr = host
c.BindPort = 0 // choose a free port
c.Logger = log.New(os.Stderr, host, log.LstdFlags)
if f != nil {
f(c)
}
m, err := newMemberlist(c)
if err != nil {
t.Fatalf("failed to get memberlist: %s", err)
}
return m
}
func TestMemberList_Probe(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = time.Millisecond
c.ProbeInterval = 10 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
a1 := alive{
Node: addr1.String(),
Addr: []byte(addr1),
Port: uint16(m1.config.BindPort),
Incarnation: 1,
Vsn: m1.config.BuildVsnArray(),
}
m1.aliveNode(&a1, nil, true)
a2 := alive{
Node: addr2.String(),
Addr: []byte(addr2),
Port: uint16(m2.config.BindPort),
Incarnation: 1,
Vsn: m2.config.BuildVsnArray(),
}
m1.aliveNode(&a2, nil, false)
// should ping addr2
m1.probe()
// Should not be marked suspect
n := m1.nodeMap[addr2.String()]
if n.State != StateAlive {
t.Fatalf("Expect node to be alive")
}
// Should increment seqno
if m1.sequenceNum != 1 {
t.Fatalf("bad seqno %v", m2.sequenceNum)
}
}
func TestMemberList_ProbeNode_Suspect(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = time.Millisecond
c.ProbeInterval = 10 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m3.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()}
m1.aliveNode(&a3, nil, false)
a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a4, nil, false)
n := m1.nodeMap[addr4.String()]
m1.probeNode(n)
// Should be marked suspect.
if n.State != StateSuspect {
t.Fatalf("Expect node to be suspect")
}
time.Sleep(10 * time.Millisecond)
// One of the peers should have attempted an indirect probe.
if s2, s3 := atomic.LoadUint32(&m2.sequenceNum), atomic.LoadUint32(&m3.sequenceNum); s2 != 1 && s3 != 1 {
t.Fatalf("bad seqnos, expected both to be 1: %v, %v", s2, s3)
}
}
func TestMemberList_ProbeNode_Suspect_Dogpile(t *testing.T) {
cases := []struct {
name string
numPeers int
confirmations int
expected time.Duration
}{
{"n=2, k=3 (max timeout disabled)", 1, 0, 500 * time.Millisecond},
{"n=3, k=3", 2, 0, 500 * time.Millisecond},
{"n=4, k=3", 3, 0, 500 * time.Millisecond},
{"n=5, k=3 (max timeout starts to take effect)", 4, 0, 1000 * time.Millisecond},
{"n=6, k=3", 5, 0, 1000 * time.Millisecond},
{"n=6, k=3 (confirmations start to lower timeout)", 5, 1, 750 * time.Millisecond},
{"n=6, k=3", 5, 2, 604 * time.Millisecond},
{"n=6, k=3 (timeout driven to nominal value)", 5, 3, 500 * time.Millisecond},
{"n=6, k=3", 5, 4, 500 * time.Millisecond},
}
for i, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Create the main memberlist under test.
addr := getBindAddr()
m := HostMemberlist(addr.String(), t, func(c *Config) {
c.ProbeTimeout = time.Millisecond
c.ProbeInterval = 100 * time.Millisecond
c.SuspicionMult = 5
c.SuspicionMaxTimeoutMult = 2
})
defer m.Shutdown()
bindPort := m.config.BindPort
a := alive{Node: addr.String(), Addr: []byte(addr), Port: uint16(bindPort), Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, true)
// Make all but one peer be an real, alive instance.
var peers []*Memberlist
for j := 0; j < c.numPeers-1; j++ {
peerAddr := getBindAddr()
peer := HostMemberlist(peerAddr.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer peer.Shutdown()
peers = append(peers, peer)
a = alive{Node: peerAddr.String(), Addr: []byte(peerAddr), Port: uint16(bindPort), Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
}
// Just use a bogus address for the last peer so it doesn't respond
// to pings, but tell the memberlist it's alive.
badPeerAddr := getBindAddr()
a = alive{Node: badPeerAddr.String(), Addr: []byte(badPeerAddr), Port: uint16(bindPort), Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
// Force a probe, which should start us into the suspect state.
m.probeNodeByAddr(badPeerAddr.String())
if m.getNodeState(badPeerAddr.String()) != StateSuspect {
t.Fatalf("case %d: expected node to be suspect", i)
}
// Add the requested number of confirmations.
for j := 0; j < c.confirmations; j++ {
from := fmt.Sprintf("peer%d", j)
s := suspect{Node: badPeerAddr.String(), Incarnation: 1, From: from}
m.suspectNode(&s)
}
// Wait until right before the timeout and make sure the timer
// hasn't fired.
fudge := 25 * time.Millisecond
time.Sleep(c.expected - fudge)
if m.getNodeState(badPeerAddr.String()) != StateSuspect {
t.Fatalf("case %d: expected node to still be suspect", i)
}
// Wait through the timeout and a little after to make sure the
// timer fires.
time.Sleep(2 * fudge)
if m.getNodeState(badPeerAddr.String()) != StateDead {
t.Fatalf("case %d: expected node to be dead", i)
}
})
}
}
/*
func TestMemberList_ProbeNode_FallbackTCP(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
var probeTimeMax time.Duration
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
probeTimeMax = c.ProbeInterval + 20*time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m3.Shutdown()
m4 := HostMemberlist(addr4.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m4.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a3, nil, false)
// Make sure m4 is configured with the same protocol version as m1 so
// the TCP fallback behavior is enabled.
a4 := alive{
Node: addr4.String(),
Addr: ip4,
Port: uint16(bindPort),
Incarnation: 1,
Vsn: []uint8{
ProtocolVersionMin,
ProtocolVersionMax,
m1.config.ProtocolVersion,
m1.config.DelegateProtocolMin,
m1.config.DelegateProtocolMax,
m1.config.DelegateProtocolVersion,
},
}
m1.aliveNode(&a4, nil, false)
// Isolate m4 from UDP traffic by re-opening its listener on the wrong
// port. This should force the TCP fallback path to be used.
var err error
if err = m4.udpListener.Close(); err != nil {
t.Fatalf("err: %v", err)
}
udpAddr := &net.UDPAddr{IP: ip4, Port: 9999}
if m4.udpListener, err = net.ListenUDP("udp", udpAddr); err != nil {
t.Fatalf("err: %v", err)
}
// Have node m1 probe m4.
n := m1.nodeMap[addr4.String()]
startProbe := time.Now()
m1.probeNode(n)
probeTime := time.Now().Sub(startProbe)
// Should be marked alive because of the TCP fallback ping.
if n.State != stateAlive {
t.Fatalf("expect node to be alive")
}
// Make sure TCP activity completed in a timely manner.
if probeTime > probeTimeMax {
t.Fatalf("took to long to probe, %9.6f", probeTime.Seconds())
}
// Confirm at least one of the peers attempted an indirect probe.
time.Sleep(probeTimeMax)
if m2.sequenceNum != 1 && m3.sequenceNum != 1 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
}
// Now shutdown all inbound TCP traffic to make sure the TCP fallback
// path properly fails when the node is really unreachable.
if err = m4.tcpListener.Close(); err != nil {
t.Fatalf("err: %v", err)
}
tcpAddr := &net.TCPAddr{IP: ip4, Port: 9999}
if m4.tcpListener, err = net.ListenTCP("tcp", tcpAddr); err != nil {
t.Fatalf("err: %v", err)
}
// Probe again, this time there should be no contact.
startProbe = time.Now()
m1.probeNode(n)
probeTime = time.Now().Sub(startProbe)
// Node should be reported suspect.
if n.State != stateSuspect {
t.Fatalf("expect node to be suspect")
}
// Make sure TCP activity didn't cause us to wait too long before
// timing out.
if probeTime > probeTimeMax {
t.Fatalf("took to long to probe, %9.6f", probeTime.Seconds())
}
// Confirm at least one of the peers attempted an indirect probe.
time.Sleep(probeTimeMax)
if m2.sequenceNum != 2 && m3.sequenceNum != 2 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
}
}
func TestMemberList_ProbeNode_FallbackTCP_Disabled(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
var probeTimeMax time.Duration
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
probeTimeMax = c.ProbeInterval + 20*time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m3.Shutdown()
m4 := HostMemberlist(addr4.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m4.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a3, nil, false)
// Make sure m4 is configured with the same protocol version as m1 so
// the TCP fallback behavior is enabled.
a4 := alive{
Node: addr4.String(),
Addr: ip4,
Port: uint16(bindPort),
Incarnation: 1,
Vsn: []uint8{
ProtocolVersionMin,
ProtocolVersionMax,
m1.config.ProtocolVersion,
m1.config.DelegateProtocolMin,
m1.config.DelegateProtocolMax,
m1.config.DelegateProtocolVersion,
},
}
m1.aliveNode(&a4, nil, false)
// Isolate m4 from UDP traffic by re-opening its listener on the wrong
// port. This should force the TCP fallback path to be used.
var err error
if err = m4.udpListener.Close(); err != nil {
t.Fatalf("err: %v", err)
}
udpAddr := &net.UDPAddr{IP: ip4, Port: 9999}
if m4.udpListener, err = net.ListenUDP("udp", udpAddr); err != nil {
t.Fatalf("err: %v", err)
}
// Disable the TCP pings using the config mechanism.
m1.config.DisableTcpPings = true
// Have node m1 probe m4.
n := m1.nodeMap[addr4.String()]
startProbe := time.Now()
m1.probeNode(n)
probeTime := time.Now().Sub(startProbe)
// Node should be reported suspect.
if n.State != stateSuspect {
t.Fatalf("expect node to be suspect")
}
// Make sure TCP activity didn't cause us to wait too long before
// timing out.
if probeTime > probeTimeMax {
t.Fatalf("took to long to probe, %9.6f", probeTime.Seconds())
}
// Confirm at least one of the peers attempted an indirect probe.
time.Sleep(probeTimeMax)
if m2.sequenceNum != 1 && m3.sequenceNum != 1 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
}
}
func TestMemberList_ProbeNode_FallbackTCP_OldProtocol(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
var probeTimeMax time.Duration
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
probeTimeMax = c.ProbeInterval + 20*time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m3.Shutdown()
m4 := HostMemberlist(addr4.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m4.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a3, nil, false)
// Set up m4 so that it doesn't understand a version of the protocol
// that supports TCP pings.
a4 := alive{
Node: addr4.String(),
Addr: ip4,
Port: uint16(bindPort),
Incarnation: 1,
Vsn: []uint8{
ProtocolVersionMin,
ProtocolVersion2Compatible,
ProtocolVersion2Compatible,
m1.config.DelegateProtocolMin,
m1.config.DelegateProtocolMax,
m1.config.DelegateProtocolVersion,
},
}
m1.aliveNode(&a4, nil, false)
// Isolate m4 from UDP traffic by re-opening its listener on the wrong
// port. This should force the TCP fallback path to be used.
var err error
if err = m4.udpListener.Close(); err != nil {
t.Fatalf("err: %v", err)
}
udpAddr := &net.UDPAddr{IP: ip4, Port: 9999}
if m4.udpListener, err = net.ListenUDP("udp", udpAddr); err != nil {
t.Fatalf("err: %v", err)
}
// Have node m1 probe m4.
n := m1.nodeMap[addr4.String()]
startProbe := time.Now()
m1.probeNode(n)
probeTime := time.Now().Sub(startProbe)
// Node should be reported suspect.
if n.State != stateSuspect {
t.Fatalf("expect node to be suspect")
}
// Make sure TCP activity didn't cause us to wait too long before
// timing out.
if probeTime > probeTimeMax {
t.Fatalf("took to long to probe, %9.6f", probeTime.Seconds())
}
// Confirm at least one of the peers attempted an indirect probe.
time.Sleep(probeTimeMax)
if m2.sequenceNum != 1 && m3.sequenceNum != 1 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
}
}
*/
func TestMemberList_ProbeNode_Awareness_Degraded(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
var probeTimeMin time.Duration
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
probeTimeMin = 2*c.ProbeInterval - 50*time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m3.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()}
m1.aliveNode(&a3, nil, false)
vsn4 := []uint8{
ProtocolVersionMin, ProtocolVersionMax, ProtocolVersionMin,
1, 1, 1,
}
// Node 4 never gets started.
a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn4}
m1.aliveNode(&a4, nil, false)
// Start the health in a degraded state.
m1.awareness.ApplyDelta(1)
if score := m1.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
// Have node m1 probe m4.
n := m1.nodeMap[addr4.String()]
startProbe := time.Now()
m1.probeNode(n)
probeTime := time.Now().Sub(startProbe)
// Node should be reported suspect.
if n.State != StateSuspect {
t.Fatalf("expect node to be suspect")
}
// Make sure we timed out approximately on time (note that we accounted
// for the slowed-down failure detector in the probeTimeMin calculation.
if probeTime < probeTimeMin {
t.Fatalf("probed too quickly, %9.6f", probeTime.Seconds())
}
// Confirm at least one of the peers attempted an indirect probe.
if m2.sequenceNum != 1 && m3.sequenceNum != 1 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
}
// We should have gotten all the nacks, so our score should remain the
// same, since we didn't get a successful probe.
if score := m1.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
}
func TestMemberList_ProbeNode_Wrong_VSN(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m3.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()}
m1.aliveNode(&a3, nil, false)
vsn4 := []uint8{
0, 0, 0,
0, 0, 0,
}
// Node 4 never gets started.
a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn4}
m1.aliveNode(&a4, nil, false)
// Start the health in a degraded state.
m1.awareness.ApplyDelta(1)
if score := m1.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
// Have node m1 probe m4.
n, ok := m1.nodeMap[addr4.String()]
if ok || n != nil {
t.Fatalf("expect node a4 to be not taken into account, because of its wrong version")
}
}
func TestMemberList_ProbeNode_Awareness_Improved(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
// Start the health in a degraded state.
m1.awareness.ApplyDelta(1)
if score := m1.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
// Have node m1 probe m2.
n := m1.nodeMap[addr2.String()]
m1.probeNode(n)
// Node should be reported alive.
if n.State != StateAlive {
t.Fatalf("expect node to be suspect")
}
// Our score should have improved since we did a good probe.
if score := m1.GetHealthScore(); score != 0 {
t.Fatalf("bad: %d", score)
}
}
func TestMemberList_ProbeNode_Awareness_MissedNack(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
var probeTimeMax time.Duration
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
probeTimeMax = c.ProbeInterval + 50*time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
vsn := m1.config.BuildVsnArray()
// Node 3 and node 4 never get started.
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn}
m1.aliveNode(&a3, nil, false)
a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1, Vsn: vsn}
m1.aliveNode(&a4, nil, false)
// Make sure health looks good.
if score := m1.GetHealthScore(); score != 0 {
t.Fatalf("bad: %d", score)
}
// Have node m1 probe m4.
n := m1.nodeMap[addr4.String()]
startProbe := time.Now()
m1.probeNode(n)
probeTime := time.Now().Sub(startProbe)
// Node should be reported suspect.
m1.nodeLock.Lock()
if n.State != StateSuspect {
t.Fatalf("expect node to be suspect")
}
m1.nodeLock.Unlock()
// Make sure we timed out approximately on time.
if probeTime > probeTimeMax {
t.Fatalf("took to long to probe, %9.6f", probeTime.Seconds())
}
// We should have gotten dinged for the missed nack. Note that the code under
// test is waiting for probeTimeMax and then doing some other work before it
// updates the awareness, so we need to wait some extra time. Rather than just
// add longer and longer sleeps, we'll retry a few times.
iretry.Run(t, func(r *iretry.R) {
if score := m1.GetHealthScore(); score != 1 {
r.Fatalf("expected health score to decrement on missed nack. want %d, "+
"got: %d", 1, score)
}
})
}
func TestMemberList_ProbeNode_Awareness_OldProtocol(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
addr4 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
ip4 := []byte(addr4)
var probeTimeMax time.Duration
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 10 * time.Millisecond
c.ProbeInterval = 200 * time.Millisecond
probeTimeMax = c.ProbeInterval + 20*time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
m3 := HostMemberlist(addr3.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m3.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a3, nil, false)
// Node 4 never gets started.
a4 := alive{Node: addr4.String(), Addr: ip4, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a4, nil, false)
// Make sure health looks good.
if score := m1.GetHealthScore(); score != 0 {
t.Fatalf("bad: %d", score)
}
// Have node m1 probe m4.
n := m1.nodeMap[addr4.String()]
startProbe := time.Now()
m1.probeNode(n)
probeTime := time.Now().Sub(startProbe)
// Node should be reported suspect.
if n.State != StateSuspect {
t.Fatalf("expect node to be suspect")
}
// Make sure we timed out approximately on time.
if probeTime > probeTimeMax {
t.Fatalf("took to long to probe, %9.6f", probeTime.Seconds())
}
// Confirm at least one of the peers attempted an indirect probe.
time.Sleep(probeTimeMax)
if m2.sequenceNum != 1 && m3.sequenceNum != 1 {
t.Fatalf("bad seqnos %v, %v", m2.sequenceNum, m3.sequenceNum)
}
// Since we are using the old protocol here, we should have gotten dinged
// for a failed health check.
if score := m1.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
}
func TestMemberList_ProbeNode_Buddy(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = time.Millisecond
c.ProbeInterval = 10 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
m1.aliveNode(&a2, nil, false)
m2.aliveNode(&a2, nil, true)
// Force the state to suspect so we piggyback a suspect message with the ping.
// We should see this get refuted later, and the ping will succeed.
n := m1.nodeMap[addr2.String()]
n.State = StateSuspect
m1.probeNode(n)
// Make sure a ping was sent.
if m1.sequenceNum != 1 {
t.Fatalf("bad seqno %v", m1.sequenceNum)
}
// Check a broadcast is queued.
if num := m2.broadcasts.NumQueued(); num != 1 {
t.Fatalf("expected only one queued message: %d", num)
}
// Should be alive msg.
if messageType(m2.broadcasts.orderedView(true)[0].b.Message()[0]) != aliveMsg {
t.Fatalf("expected queued alive msg")
}
}
func TestMemberList_ProbeNode(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = time.Millisecond
c.ProbeInterval = 10 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a2, nil, false)
n := m1.nodeMap[addr2.String()]
m1.probeNode(n)
// Should be marked alive
if n.State != StateAlive {
t.Fatalf("Expect node to be alive")
}
// Should increment seqno
if m1.sequenceNum != 1 {
t.Fatalf("bad seqno %v", m1.sequenceNum)
}
}
func TestMemberList_Ping(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.ProbeTimeout = 1 * time.Second
c.ProbeInterval = 10 * time.Second
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1}
m1.aliveNode(&a2, nil, false)
// Do a legit ping.
n := m1.nodeMap[addr2.String()]
addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(addr2.String(), strconv.Itoa(bindPort)))
if err != nil {
t.Fatalf("err: %v", err)
}
rtt, err := m1.Ping(n.Name, addr)
if err != nil {
t.Fatalf("err: %v", err)
}
if !(rtt > 0) {
t.Fatalf("bad: %v", rtt)
}
// This ping has a bad node name so should timeout.
_, err = m1.Ping("bad", addr)
if _, ok := err.(NoPingResponseError); !ok || err == nil {
t.Fatalf("bad: %v", err)
}
}
func TestMemberList_ResetNodes(t *testing.T) {
m := GetMemberlist(t, func(c *Config) {
c.GossipToTheDeadTime = 100 * time.Millisecond
})
defer m.Shutdown()
a1 := alive{Node: "test1", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a1, nil, false)
a2 := alive{Node: "test2", Addr: []byte{127, 0, 0, 2}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a2, nil, false)
a3 := alive{Node: "test3", Addr: []byte{127, 0, 0, 3}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a3, nil, false)
d := dead{Node: "test2", Incarnation: 1}
m.deadNode(&d)
m.resetNodes()
if len(m.nodes) != 3 {
t.Fatalf("Bad length")
}
if _, ok := m.nodeMap["test2"]; !ok {
t.Fatalf("test2 should not be unmapped")
}
time.Sleep(200 * time.Millisecond)
m.resetNodes()
if len(m.nodes) != 2 {
t.Fatalf("Bad length")
}
if _, ok := m.nodeMap["test2"]; ok {
t.Fatalf("test2 should be unmapped")
}
}
func TestMemberList_NextSeq(t *testing.T) {
m := &Memberlist{}
if m.nextSeqNo() != 1 {
t.Fatalf("bad sequence no")
}
if m.nextSeqNo() != 2 {
t.Fatalf("bad sequence no")
}
}
func ackHandlerExists(t *testing.T, m *Memberlist, idx uint32) bool {
t.Helper()
m.ackLock.Lock()
_, ok := m.ackHandlers[idx]
m.ackLock.Unlock()
return ok
}
func TestMemberList_setProbeChannels(t *testing.T) {
m := &Memberlist{ackHandlers: make(map[uint32]*ackHandler)}
ch := make(chan ackMessage, 1)
m.setProbeChannels(0, ch, nil, 10*time.Millisecond)
require.True(t, ackHandlerExists(t, m, 0), "missing handler")
time.Sleep(20 * time.Millisecond)
require.False(t, ackHandlerExists(t, m, 0), "non-reaped handler")
}
func TestMemberList_setAckHandler(t *testing.T) {
m := &Memberlist{ackHandlers: make(map[uint32]*ackHandler)}
f := func([]byte, time.Time) {}
m.setAckHandler(0, f, 10*time.Millisecond)
require.True(t, ackHandlerExists(t, m, 0), "missing handler")
time.Sleep(20 * time.Millisecond)
require.False(t, ackHandlerExists(t, m, 0), "non-reaped handler")
}
func TestMemberList_invokeAckHandler(t *testing.T) {
m := &Memberlist{ackHandlers: make(map[uint32]*ackHandler)}
// Does nothing
m.invokeAckHandler(ackResp{}, time.Now())
var b bool
f := func(payload []byte, timestamp time.Time) { b = true }
m.setAckHandler(0, f, 10*time.Millisecond)
// Should set b
m.invokeAckHandler(ackResp{0, nil}, time.Now())
if !b {
t.Fatalf("b not set")
}
require.False(t, ackHandlerExists(t, m, 0), "non-reaped handler")
}
func TestMemberList_invokeAckHandler_Channel_Ack(t *testing.T) {
m := &Memberlist{ackHandlers: make(map[uint32]*ackHandler)}
ack := ackResp{0, []byte{0, 0, 0}}
// Does nothing
m.invokeAckHandler(ack, time.Now())
ackCh := make(chan ackMessage, 1)
nackCh := make(chan struct{}, 1)
m.setProbeChannels(0, ackCh, nackCh, 10*time.Millisecond)
// Should send message
m.invokeAckHandler(ack, time.Now())
select {
case v := <-ackCh:
if v.Complete != true {
t.Fatalf("Bad value")
}
if bytes.Compare(v.Payload, ack.Payload) != 0 {
t.Fatalf("wrong payload. expected: %v; actual: %v", ack.Payload, v.Payload)
}
case <-nackCh:
t.Fatalf("should not get a nack")
default:
t.Fatalf("message not sent")
}
require.False(t, ackHandlerExists(t, m, 0), "non-reaped handler")
}
func TestMemberList_invokeAckHandler_Channel_Nack(t *testing.T) {
m := &Memberlist{ackHandlers: make(map[uint32]*ackHandler)}
nack := nackResp{0}
// Does nothing.
m.invokeNackHandler(nack)
ackCh := make(chan ackMessage, 1)
nackCh := make(chan struct{}, 1)
m.setProbeChannels(0, ackCh, nackCh, 10*time.Millisecond)
// Should send message.
m.invokeNackHandler(nack)
select {
case <-ackCh:
t.Fatalf("should not get an ack")
case <-nackCh:
// Good.
default:
t.Fatalf("message not sent")
}
// Getting a nack doesn't reap the handler so that we can still forward
// an ack up to the reap time, if we get one.
require.True(t, ackHandlerExists(t, m, 0), "handler should not be reaped")
ack := ackResp{0, []byte{0, 0, 0}}
m.invokeAckHandler(ack, time.Now())
select {
case v := <-ackCh:
if v.Complete != true {
t.Fatalf("Bad value")
}
if bytes.Compare(v.Payload, ack.Payload) != 0 {
t.Fatalf("wrong payload. expected: %v; actual: %v", ack.Payload, v.Payload)
}
case <-nackCh:
t.Fatalf("should not get a nack")
default:
t.Fatalf("message not sent")
}
require.False(t, ackHandlerExists(t, m, 0), "non-reaped handler")
}
func TestMemberList_AliveNode_NewNode(t *testing.T) {
ch := make(chan NodeEvent, 1)
m := GetMemberlist(t, func(c *Config) {
c.Events = &ChannelEventDelegate{ch}
})
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
if len(m.nodes) != 1 {
t.Fatalf("should add node")
}
state, ok := m.nodeMap["test"]
if !ok {
t.Fatalf("should map node")
}
if state.Incarnation != 1 {
t.Fatalf("bad incarnation")
}
if state.State != StateAlive {
t.Fatalf("bad state")
}
if time.Now().Sub(state.StateChange) > time.Second {
t.Fatalf("bad change delta")
}
// Check for a join message
select {
case e := <-ch:
if e.Node.Name != "test" {
t.Fatalf("bad node name")
}
default:
t.Fatalf("no join message")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected queued message")
}
}
func TestMemberList_AliveNode_SuspectNode(t *testing.T) {
ch := make(chan NodeEvent, 1)
ted := &toggledEventDelegate{
real: &ChannelEventDelegate{ch},
}
m := GetMemberlist(t, func(c *Config) {
c.Events = ted
})
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
// Listen only after first join
ted.Toggle(true)
// Make suspect
state := m.nodeMap["test"]
state.State = StateSuspect
state.StateChange = state.StateChange.Add(-time.Hour)
// Old incarnation number, should not change
m.aliveNode(&a, nil, false)
if state.State != StateSuspect {
t.Fatalf("update with old incarnation!")
}
// Should reset to alive now
a.Incarnation = 2
m.aliveNode(&a, nil, false)
if state.State != StateAlive {
t.Fatalf("no update with new incarnation!")
}
if time.Now().Sub(state.StateChange) > time.Second {
t.Fatalf("bad change delta")
}
// Check for a no join message
select {
case <-ch:
t.Fatalf("got bad join message")
default:
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected queued message")
}
}
func TestMemberList_AliveNode_Idempotent(t *testing.T) {
ch := make(chan NodeEvent, 1)
ted := &toggledEventDelegate{
real: &ChannelEventDelegate{ch},
}
m := GetMemberlist(t, func(c *Config) {
c.Events = ted
})
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
// Listen only after first join
ted.Toggle(true)
// Make suspect
state := m.nodeMap["test"]
stateTime := state.StateChange
// Should reset to alive now
a.Incarnation = 2
m.aliveNode(&a, nil, false)
if state.State != StateAlive {
t.Fatalf("non idempotent")
}
if stateTime != state.StateChange {
t.Fatalf("should not change state")
}
// Check for a no join message
select {
case <-ch:
t.Fatalf("got bad join message")
default:
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
}
type toggledEventDelegate struct {
mu sync.Mutex
real EventDelegate
enabled bool
}
func (d *toggledEventDelegate) Toggle(enabled bool) {
d.mu.Lock()
defer d.mu.Unlock()
d.enabled = enabled
}
// NotifyJoin is invoked when a node is detected to have joined.
// The Node argument must not be modified.
func (d *toggledEventDelegate) NotifyJoin(n *Node) {
d.mu.Lock()
defer d.mu.Unlock()
if d.enabled {
d.real.NotifyJoin(n)
}
}
// NotifyLeave is invoked when a node is detected to have left.
// The Node argument must not be modified.
func (d *toggledEventDelegate) NotifyLeave(n *Node) {
d.mu.Lock()
defer d.mu.Unlock()
if d.enabled {
d.real.NotifyLeave(n)
}
}
// NotifyUpdate is invoked when a node is detected to have
// updated, usually involving the meta data. The Node argument
// must not be modified.
func (d *toggledEventDelegate) NotifyUpdate(n *Node) {
d.mu.Lock()
defer d.mu.Unlock()
if d.enabled {
d.real.NotifyUpdate(n)
}
}
// Serf Bug: GH-58, Meta data does not update
func TestMemberList_AliveNode_ChangeMeta(t *testing.T) {
ch := make(chan NodeEvent, 1)
ted := &toggledEventDelegate{
real: &ChannelEventDelegate{ch},
}
m := GetMemberlist(t, func(c *Config) {
c.Events = ted
})
defer m.Shutdown()
a := alive{
Node: "test",
Addr: []byte{127, 0, 0, 1},
Meta: []byte("val1"),
Incarnation: 1,
Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
// Listen only after first join
ted.Toggle(true)
// Make suspect
state := m.nodeMap["test"]
// Should reset to alive now
a.Incarnation = 2
a.Meta = []byte("val2")
m.aliveNode(&a, nil, false)
// Check updates
if bytes.Compare(state.Meta, a.Meta) != 0 {
t.Fatalf("meta did not update")
}
// Check for a NotifyUpdate
select {
case e := <-ch:
if e.Event != NodeUpdate {
t.Fatalf("bad event: %v", e)
}
if !reflect.DeepEqual(*e.Node, state.Node) {
t.Fatalf("expected %v, got %v", *e.Node, state.Node)
}
if bytes.Compare(e.Node.Meta, a.Meta) != 0 {
t.Fatalf("meta did not update")
}
default:
t.Fatalf("missing event!")
}
}
func TestMemberList_AliveNode_Refute(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, true)
// Clear queue
m.broadcasts.Reset()
// Conflicting alive
s := alive{
Node: m.config.Name,
Addr: []byte{127, 0, 0, 1},
Incarnation: 2,
Meta: []byte("foo"),
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s, nil, false)
state := m.nodeMap[m.config.Name]
if state.State != StateAlive {
t.Fatalf("should still be alive")
}
if state.Meta != nil {
t.Fatalf("meta should still be nil")
}
// Check a broad cast is queued
if num := m.broadcasts.NumQueued(); num != 1 {
t.Fatalf("expected only one queued message: %d",
num)
}
// Should be alive mesg
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != aliveMsg {
t.Fatalf("expected queued alive msg")
}
}
func TestMemberList_AliveNode_Conflict(t *testing.T) {
m := GetMemberlist(t, func(c *Config) {
c.DeadNodeReclaimTime = 10 * time.Millisecond
})
defer m.Shutdown()
nodeName := "test"
a := alive{Node: nodeName, Addr: []byte{127, 0, 0, 1}, Port: 8000, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, true)
// Clear queue
m.broadcasts.Reset()
// Conflicting alive
s := alive{
Node: nodeName,
Addr: []byte{127, 0, 0, 2},
Port: 9000,
Incarnation: 2,
Meta: []byte("foo"),
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s, nil, false)
state := m.nodeMap[nodeName]
if state.State != StateAlive {
t.Fatalf("should still be alive")
}
if state.Meta != nil {
t.Fatalf("meta should still be nil")
}
if bytes.Equal(state.Addr, []byte{127, 0, 0, 2}) {
t.Fatalf("address should not be updated")
}
if state.Port == 9000 {
t.Fatalf("port should not be updated")
}
// Check a broad cast is queued
if num := m.broadcasts.NumQueued(); num != 0 {
t.Fatalf("expected 0 queued messages: %d", num)
}
// Change the node to dead
d := dead{Node: nodeName, Incarnation: 2}
m.deadNode(&d)
m.broadcasts.Reset()
state = m.nodeMap[nodeName]
if state.State != StateDead {
t.Fatalf("should be dead")
}
time.Sleep(m.config.DeadNodeReclaimTime)
// New alive node
s2 := alive{
Node: nodeName,
Addr: []byte{127, 0, 0, 2},
Port: 9000,
Incarnation: 3,
Meta: []byte("foo"),
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s2, nil, false)
state = m.nodeMap[nodeName]
if state.State != StateAlive {
t.Fatalf("should still be alive")
}
if !bytes.Equal(state.Meta, []byte("foo")) {
t.Fatalf("meta should be updated")
}
if !bytes.Equal(state.Addr, []byte{127, 0, 0, 2}) {
t.Fatalf("address should be updated")
}
if state.Port != 9000 {
t.Fatalf("port should be updated")
}
}
func TestMemberList_SuspectNode_NoNode(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
s := suspect{Node: "test", Incarnation: 1}
m.suspectNode(&s)
if len(m.nodes) != 0 {
t.Fatalf("don't expect nodes")
}
}
func TestMemberList_SuspectNode(t *testing.T) {
m := GetMemberlist(t, func(c *Config) {
c.ProbeInterval = time.Millisecond
c.SuspicionMult = 1
})
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
m.changeNode("test", func(state *nodeState) {
state.StateChange = state.StateChange.Add(-time.Hour)
})
s := suspect{Node: "test", Incarnation: 1}
m.suspectNode(&s)
if m.getNodeState("test") != StateSuspect {
t.Fatalf("Bad state")
}
change := m.getNodeStateChange("test")
if time.Now().Sub(change) > time.Second {
t.Fatalf("bad change delta")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
// Check its a suspect message
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != suspectMsg {
t.Fatalf("expected queued suspect msg")
}
// Wait for the timeout
time.Sleep(10 * time.Millisecond)
if m.getNodeState("test") != StateDead {
t.Fatalf("Bad state")
}
newChange := m.getNodeStateChange("test")
if time.Now().Sub(newChange) > time.Second {
t.Fatalf("bad change delta")
}
if !newChange.After(change) {
t.Fatalf("should increment time")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
// Check its a suspect message
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != deadMsg {
t.Fatalf("expected queued dead msg")
}
}
func TestMemberList_SuspectNode_DoubleSuspect(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
state := m.nodeMap["test"]
state.StateChange = state.StateChange.Add(-time.Hour)
s := suspect{Node: "test", Incarnation: 1}
m.suspectNode(&s)
if state.State != StateSuspect {
t.Fatalf("Bad state")
}
change := state.StateChange
if time.Now().Sub(change) > time.Second {
t.Fatalf("bad change delta")
}
// clear the broadcast queue
m.broadcasts.Reset()
// Suspect again
m.suspectNode(&s)
if state.StateChange != change {
t.Fatalf("unexpected state change")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 0 {
t.Fatalf("expected only one queued message")
}
}
func TestMemberList_SuspectNode_OldSuspect(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
state := m.nodeMap["test"]
state.StateChange = state.StateChange.Add(-time.Hour)
// Clear queue
m.broadcasts.Reset()
s := suspect{Node: "test", Incarnation: 1}
m.suspectNode(&s)
if state.State != StateAlive {
t.Fatalf("Bad state")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 0 {
t.Fatalf("expected only one queued message")
}
}
func TestMemberList_SuspectNode_Refute(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, true)
// Clear queue
m.broadcasts.Reset()
// Make sure health is in a good state
if score := m.GetHealthScore(); score != 0 {
t.Fatalf("bad: %d", score)
}
s := suspect{Node: m.config.Name, Incarnation: 1}
m.suspectNode(&s)
state := m.nodeMap[m.config.Name]
if state.State != StateAlive {
t.Fatalf("should still be alive")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
// Should be alive mesg
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != aliveMsg {
t.Fatalf("expected queued alive msg")
}
// Health should have been dinged
if score := m.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
}
func TestMemberList_DeadNode_NoNode(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
d := dead{Node: "test", Incarnation: 1}
m.deadNode(&d)
if len(m.nodes) != 0 {
t.Fatalf("don't expect nodes")
}
}
func TestMemberList_DeadNodeLeft(t *testing.T) {
ch := make(chan NodeEvent, 1)
m := GetMemberlist(t, func(c *Config) {
c.Events = &ChannelEventDelegate{ch}
})
defer m.Shutdown()
nodeName := "node1"
s1 := alive{
Node: nodeName,
Addr: []byte{127, 0, 0, 1},
Port: 8000,
Incarnation: 1,
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s1, nil, false)
// Read the join event
<-ch
d := dead{Node: nodeName, From: nodeName, Incarnation: 1}
m.deadNode(&d)
// Read the dead event
<-ch
state := m.nodeMap[nodeName]
if state.State != StateLeft {
t.Fatalf("Bad state")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
// Check its a dead message
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != deadMsg {
t.Fatalf("expected queued dead msg")
}
// Clear queue
// m.broadcasts.Reset()
// New alive node
s2 := alive{
Node: nodeName,
Addr: []byte{127, 0, 0, 2},
Port: 9000,
Incarnation: 3,
Meta: []byte("foo"),
Vsn: m.config.BuildVsnArray(),
}
m.aliveNode(&s2, nil, false)
// Read the join event
<-ch
state = m.nodeMap[nodeName]
if state.State != StateAlive {
t.Fatalf("should still be alive")
}
if !bytes.Equal(state.Meta, []byte("foo")) {
t.Fatalf("meta should be updated")
}
if !bytes.Equal(state.Addr, []byte{127, 0, 0, 2}) {
t.Fatalf("address should be updated")
}
if state.Port != 9000 {
t.Fatalf("port should be updated")
}
}
func TestMemberList_DeadNode(t *testing.T) {
ch := make(chan NodeEvent, 1)
m := GetMemberlist(t, func(c *Config) {
c.Events = &ChannelEventDelegate{ch}
})
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
// Read the join event
<-ch
state := m.nodeMap["test"]
state.StateChange = state.StateChange.Add(-time.Hour)
d := dead{Node: "test", Incarnation: 1}
m.deadNode(&d)
if state.State != StateDead {
t.Fatalf("Bad state")
}
change := state.StateChange
if time.Now().Sub(change) > time.Second {
t.Fatalf("bad change delta")
}
select {
case leave := <-ch:
if leave.Event != NodeLeave || leave.Node.Name != "test" {
t.Fatalf("bad node name")
}
default:
t.Fatalf("no leave message")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
// Check its a dead message
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != deadMsg {
t.Fatalf("expected queued dead msg")
}
}
func TestMemberList_DeadNode_Double(t *testing.T) {
ch := make(chan NodeEvent, 1)
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
state := m.nodeMap["test"]
state.StateChange = state.StateChange.Add(-time.Hour)
d := dead{Node: "test", Incarnation: 1}
m.deadNode(&d)
// Clear queue
m.broadcasts.Reset()
// Notify after the first dead
m.config.Events = &ChannelEventDelegate{ch}
// Should do nothing
d.Incarnation = 2
m.deadNode(&d)
select {
case <-ch:
t.Fatalf("should not get leave")
default:
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 0 {
t.Fatalf("expected only one queued message")
}
}
func TestMemberList_DeadNode_OldDead(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
state := m.nodeMap["test"]
state.StateChange = state.StateChange.Add(-time.Hour)
d := dead{Node: "test", Incarnation: 1}
m.deadNode(&d)
if state.State != StateAlive {
t.Fatalf("Bad state")
}
}
func TestMemberList_DeadNode_AliveReplay(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: "test", Addr: []byte{127, 0, 0, 1}, Incarnation: 10, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, false)
d := dead{Node: "test", Incarnation: 10}
m.deadNode(&d)
// Replay alive at same incarnation
m.aliveNode(&a, nil, false)
// Should remain dead
state, ok := m.nodeMap["test"]
if ok && state.State != StateDead {
t.Fatalf("Bad state")
}
}
func TestMemberList_DeadNode_Refute(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a := alive{Node: m.config.Name, Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a, nil, true)
// Clear queue
m.broadcasts.Reset()
// Make sure health is in a good state
if score := m.GetHealthScore(); score != 0 {
t.Fatalf("bad: %d", score)
}
d := dead{Node: m.config.Name, Incarnation: 1}
m.deadNode(&d)
state := m.nodeMap[m.config.Name]
if state.State != StateAlive {
t.Fatalf("should still be alive")
}
// Check a broad cast is queued
if m.broadcasts.NumQueued() != 1 {
t.Fatalf("expected only one queued message")
}
// Should be alive mesg
if messageType(m.broadcasts.orderedView(true)[0].b.Message()[0]) != aliveMsg {
t.Fatalf("expected queued alive msg")
}
// We should have been dinged
if score := m.GetHealthScore(); score != 1 {
t.Fatalf("bad: %d", score)
}
}
func TestMemberList_MergeState(t *testing.T) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
a1 := alive{Node: "test1", Addr: []byte{127, 0, 0, 1}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a1, nil, false)
a2 := alive{Node: "test2", Addr: []byte{127, 0, 0, 2}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a2, nil, false)
a3 := alive{Node: "test3", Addr: []byte{127, 0, 0, 3}, Incarnation: 1, Vsn: m.config.BuildVsnArray()}
m.aliveNode(&a3, nil, false)
s := suspect{Node: "test1", Incarnation: 1}
m.suspectNode(&s)
remote := []pushNodeState{
pushNodeState{
Name: "test1",
Addr: []byte{127, 0, 0, 1},
Incarnation: 2,
State: StateAlive,
},
pushNodeState{
Name: "test2",
Addr: []byte{127, 0, 0, 2},
Incarnation: 1,
State: StateSuspect,
},
pushNodeState{
Name: "test3",
Addr: []byte{127, 0, 0, 3},
Incarnation: 1,
State: StateDead,
},
pushNodeState{
Name: "test4",
Addr: []byte{127, 0, 0, 4},
Incarnation: 2,
State: StateAlive,
},
}
// Listen for changes
eventCh := make(chan NodeEvent, 1)
m.config.Events = &ChannelEventDelegate{eventCh}
// Merge remote state
m.mergeState(remote)
// Check the states
state := m.nodeMap["test1"]
if state.State != StateAlive || state.Incarnation != 2 {
t.Fatalf("Bad state %v", state)
}
state = m.nodeMap["test2"]
if state.State != StateSuspect || state.Incarnation != 1 {
t.Fatalf("Bad state %v", state)
}
state = m.nodeMap["test3"]
if state.State != StateSuspect {
t.Fatalf("Bad state %v", state)
}
state = m.nodeMap["test4"]
if state.State != StateAlive || state.Incarnation != 2 {
t.Fatalf("Bad state %v", state)
}
// Check the channels
select {
case e := <-eventCh:
if e.Event != NodeJoin || e.Node.Name != "test4" {
t.Fatalf("bad node %v", e)
}
default:
t.Fatalf("Expect join")
}
select {
case e := <-eventCh:
t.Fatalf("Unexpect event: %v", e)
default:
}
}
func TestMemberlist_Gossip(t *testing.T) {
ch := make(chan NodeEvent, 3)
addr1 := getBindAddr()
addr2 := getBindAddr()
addr3 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
ip3 := []byte(addr3)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
// Set the gossip interval fast enough to get a reasonable test,
// but slow enough to avoid "sendto: operation not permitted"
c.GossipInterval = 10 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
c.Events = &ChannelEventDelegate{ch}
// Set the gossip interval fast enough to get a reasonable test,
// but slow enough to avoid "sendto: operation not permitted"
c.GossipInterval = 10 * time.Millisecond
})
defer m2.Shutdown()
m3 := HostMemberlist(addr2.String(), t, func(c *Config) {
})
defer m3.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
a3 := alive{Node: addr3.String(), Addr: ip3, Port: uint16(bindPort), Incarnation: 1, Vsn: m3.config.BuildVsnArray()}
m1.aliveNode(&a3, nil, false)
// Gossip should send all this to m2. Retry a few times because it's UDP and
// timing and stuff makes this flaky without.
retry(t, 15, 250*time.Millisecond, func(failf func(string, ...interface{})) {
m1.gossip()
time.Sleep(3 * time.Millisecond)
if len(ch) < 3 {
failf("expected 3 messages from gossip but only got %d", len(ch))
}
})
}
func retry(t *testing.T, n int, w time.Duration, fn func(func(string, ...interface{}))) {
t.Helper()
for try := 1; try <= n; try++ {
failed := false
failFormat := ""
failArgs := []interface{}{}
failf := func(format string, args ...interface{}) {
failed = true
failFormat = format
failArgs = args
}
fn(failf)
if !failed {
return
}
if try == n {
t.Fatalf(failFormat, failArgs...)
}
time.Sleep(w)
}
}
func TestMemberlist_GossipToDead(t *testing.T) {
ch := make(chan NodeEvent, 2)
addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.GossipInterval = time.Millisecond
c.GossipToTheDeadTime = 100 * time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
c.Events = &ChannelEventDelegate{ch}
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
// Shouldn't send anything to m2 here, node has been dead for 2x the GossipToTheDeadTime
m1.nodeMap[addr2.String()].State = StateDead
m1.nodeMap[addr2.String()].StateChange = time.Now().Add(-200 * time.Millisecond)
m1.gossip()
select {
case <-ch:
t.Fatalf("shouldn't get gossip")
case <-time.After(50 * time.Millisecond):
}
// Should gossip to m2 because its state has changed within GossipToTheDeadTime
m1.nodeMap[addr2.String()].StateChange = time.Now().Add(-20 * time.Millisecond)
retry(t, 5, 10*time.Millisecond, func(failf func(string, ...interface{})) {
m1.gossip()
time.Sleep(3 * time.Millisecond)
if len(ch) < 2 {
failf("expected 2 messages from gossip")
}
})
}
func TestMemberlist_FailedRemote(t *testing.T) {
type test struct {
name string
err error
expected bool
}
tests := []test{
{"nil error", nil, false},
{"normal error", fmt.Errorf(""), false},
{"net.OpError for file", &net.OpError{Net: "file"}, false},
{"net.OpError for udp", &net.OpError{Net: "udp"}, false},
{"net.OpError for udp4", &net.OpError{Net: "udp4"}, false},
{"net.OpError for udp6", &net.OpError{Net: "udp6"}, false},
{"net.OpError for tcp", &net.OpError{Net: "tcp"}, false},
{"net.OpError for tcp4", &net.OpError{Net: "tcp4"}, false},
{"net.OpError for tcp6", &net.OpError{Net: "tcp6"}, false},
{"net.OpError for tcp with dial", &net.OpError{Net: "tcp", Op: "dial"}, true},
{"net.OpError for tcp with write", &net.OpError{Net: "tcp", Op: "write"}, true},
{"net.OpError for tcp with read", &net.OpError{Net: "tcp", Op: "read"}, true},
{"net.OpError for udp with write", &net.OpError{Net: "udp", Op: "write"}, true},
{"net.OpError for udp with read", &net.OpError{Net: "udp", Op: "read"}, false},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := failedRemote(test.err)
if actual != test.expected {
t.Fatalf("expected %t, got %t", test.expected, actual)
}
})
}
}
func TestMemberlist_PushPull(t *testing.T) {
addr1 := getBindAddr()
addr2 := getBindAddr()
ip1 := []byte(addr1)
ip2 := []byte(addr2)
sink := registerInMemorySink(t)
ch := make(chan NodeEvent, 3)
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
c.GossipInterval = 10 * time.Second
c.PushPullInterval = time.Millisecond
})
defer m1.Shutdown()
bindPort := m1.config.BindPort
m2 := HostMemberlist(addr2.String(), t, func(c *Config) {
c.BindPort = bindPort
c.GossipInterval = 10 * time.Second
c.Events = &ChannelEventDelegate{ch}
})
defer m2.Shutdown()
a1 := alive{Node: addr1.String(), Addr: ip1, Port: uint16(bindPort), Incarnation: 1, Vsn: m1.config.BuildVsnArray()}
m1.aliveNode(&a1, nil, true)
a2 := alive{Node: addr2.String(), Addr: ip2, Port: uint16(bindPort), Incarnation: 1, Vsn: m2.config.BuildVsnArray()}
m1.aliveNode(&a2, nil, false)
// Gossip should send all this to m2. It's UDP though so retry a few times
retry(t, 5, 10*time.Millisecond, func(failf func(string, ...interface{})) {
m1.pushPull()
time.Sleep(3 * time.Millisecond)
if len(ch) < 2 {
failf("expected 2 messages from pushPull")
}
instancesMetricName := "consul.usage.test.memberlist.node.instances"
verifyGaugeExists(t, "consul.usage.test.memberlist.size.local", sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateAlive.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateDead.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateLeft.metricsString()), sink)
verifyGaugeExists(t, fmt.Sprintf("%s;node_state=%s", instancesMetricName, StateSuspect.metricsString()), sink)
})
}
func TestVerifyProtocol(t *testing.T) {
cases := []struct {
Anodes [][3]uint8
Bnodes [][3]uint8
expected bool
}{
// Both running identical everything
{
Anodes: [][3]uint8{
{0, 0, 0},
},
Bnodes: [][3]uint8{
{0, 0, 0},
},
expected: true,
},
// One can understand newer, but speaking same protocol
{
Anodes: [][3]uint8{
{0, 0, 0},
},
Bnodes: [][3]uint8{
{0, 1, 0},
},
expected: true,
},
// One is speaking outside the range
{
Anodes: [][3]uint8{
{0, 0, 0},
},
Bnodes: [][3]uint8{
{1, 1, 1},
},
expected: false,
},
// Transitively outside the range
{
Anodes: [][3]uint8{
{0, 1, 0},
{0, 2, 1},
},
Bnodes: [][3]uint8{
{1, 3, 1},
},
expected: false,
},
// Multi-node
{
Anodes: [][3]uint8{
{0, 3, 2},
{0, 2, 0},
},
Bnodes: [][3]uint8{
{0, 2, 1},
{0, 5, 0},
},
expected: true,
},
}
for _, tc := range cases {
aCore := make([][6]uint8, len(tc.Anodes))
aApp := make([][6]uint8, len(tc.Anodes))
for i, n := range tc.Anodes {
aCore[i] = [6]uint8{n[0], n[1], n[2], 0, 0, 0}
aApp[i] = [6]uint8{0, 0, 0, n[0], n[1], n[2]}
}
bCore := make([][6]uint8, len(tc.Bnodes))
bApp := make([][6]uint8, len(tc.Bnodes))
for i, n := range tc.Bnodes {
bCore[i] = [6]uint8{n[0], n[1], n[2], 0, 0, 0}
bApp[i] = [6]uint8{0, 0, 0, n[0], n[1], n[2]}
}
// Test core protocol verification
testVerifyProtocolSingle(t, aCore, bCore, tc.expected)
testVerifyProtocolSingle(t, bCore, aCore, tc.expected)
// Test app protocol verification
testVerifyProtocolSingle(t, aApp, bApp, tc.expected)
testVerifyProtocolSingle(t, bApp, aApp, tc.expected)
}
}
func testVerifyProtocolSingle(t *testing.T, A [][6]uint8, B [][6]uint8, expect bool) {
m := GetMemberlist(t, nil)
defer m.Shutdown()
m.nodes = make([]*nodeState, len(A))
for i, n := range A {
m.nodes[i] = &nodeState{
Node: Node{
PMin: n[0],
PMax: n[1],
PCur: n[2],
DMin: n[3],
DMax: n[4],
DCur: n[5],
},
}
}
remote := make([]pushNodeState, len(B))
for i, n := range B {
remote[i] = pushNodeState{
Name: fmt.Sprintf("node %d", i),
Vsn: []uint8{n[0], n[1], n[2], n[3], n[4], n[5]},
}
}
err := m.verifyProtocol(remote)
if (err == nil) != expect {
t.Fatalf("bad:\nA: %v\nB: %v\nErr: %s", A, B, err)
}
}
func registerInMemorySink(t *testing.T) *metrics.InmemSink {
t.Helper()
// Only have a single interval for the test
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.usage.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, sink)
return sink
}
func getIntervalMetrics(t *testing.T, sink *metrics.InmemSink) *metrics.IntervalMetrics {
t.Helper()
intervals := sink.Data()
require.Len(t, intervals, 1)
intv := intervals[0]
return intv
}
func verifyGaugeExists(t *testing.T, name string, sink *metrics.InmemSink) {
t.Helper()
interval := getIntervalMetrics(t, sink)
interval.RLock()
defer interval.RUnlock()
if _, ok := interval.Gauges[name]; !ok {
t.Fatalf("%s gauge not emmited", name)
}
}
func verifySampleExists(t *testing.T, name string, sink *metrics.InmemSink) {
t.Helper()
interval := getIntervalMetrics(t, sink)
interval.RLock()
defer interval.RUnlock()
if _, ok := interval.Samples[name]; !ok {
t.Fatalf("%s sample not emmited", name)
}
}