File: //proc/self/root/opt/go/pkg/mod/github.com/hashicorp/
[email protected]/transport_test.go
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package memberlist
import (
"log"
"net"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestTransport_Join(t *testing.T) {
net := &MockNetwork{}
t1 := net.NewTransport("node1")
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = t1
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
}
m1.setAlive()
m1.schedule()
defer m1.Shutdown()
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
}
m2.setAlive()
m2.schedule()
defer m2.Shutdown()
num, err := m2.Join([]string{c1.Name + "/" + t1.addr.String()})
if num != 1 {
t.Fatalf("bad: %d", num)
}
if err != nil {
t.Fatalf("err: %v", err)
}
if len(m2.Members()) != 2 {
t.Fatalf("bad: %v", m2.Members())
}
if m2.estNumNodes() != 2 {
t.Fatalf("bad: %v", m2.Members())
}
}
func TestTransport_Send(t *testing.T) {
net := &MockNetwork{}
t1 := net.NewTransport("node1")
d1 := &MockDelegate{}
c1 := DefaultLANConfig()
c1.Name = "node1"
c1.Transport = t1
c1.Delegate = d1
m1, err := Create(c1)
if err != nil {
t.Fatalf("err: %v", err)
}
m1.setAlive()
m1.schedule()
defer m1.Shutdown()
c2 := DefaultLANConfig()
c2.Name = "node2"
c2.Transport = net.NewTransport("node2")
m2, err := Create(c2)
if err != nil {
t.Fatalf("err: %v", err)
}
m2.setAlive()
m2.schedule()
defer m2.Shutdown()
num, err := m2.Join([]string{c1.Name + "/" + t1.addr.String()})
if num != 1 {
t.Fatalf("bad: %d", num)
}
if err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendTo(t1.addr, []byte("SendTo")); err != nil {
t.Fatalf("err: %v", err)
}
var n1 *Node
for _, n := range m2.Members() {
if n.Name == c1.Name {
n1 = n
break
}
}
if n1 == nil {
t.Fatalf("bad")
}
if err := m2.SendToUDP(n1, []byte("SendToUDP")); err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendToTCP(n1, []byte("SendToTCP")); err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendBestEffort(n1, []byte("SendBestEffort")); err != nil {
t.Fatalf("err: %v", err)
}
if err := m2.SendReliable(n1, []byte("SendReliable")); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(100 * time.Millisecond)
expected := []string{"SendTo", "SendToUDP", "SendToTCP", "SendBestEffort", "SendReliable"}
msgs1 := d1.getMessages()
received := make([]string, len(msgs1))
for i, bs := range msgs1 {
received[i] = string(bs)
}
// Some of these are UDP so often get re-ordered making the test flaky if we
// assert send ordering. Sort both slices to be tolerant of re-ordering.
require.ElementsMatch(t, expected, received)
}
type testCountingWriter struct {
t *testing.T
numCalls *int32
}
func (tw testCountingWriter) Write(p []byte) (n int, err error) {
atomic.AddInt32(tw.numCalls, 1)
if !strings.Contains(string(p), "memberlist: Error accepting TCP connection") {
tw.t.Error("did not receive expected log message")
}
tw.t.Log("countingWriter:", string(p))
return len(p), nil
}
// TestTransport_TcpListenBackoff tests that AcceptTCP() errors in NetTransport#tcpListen()
// do not result in a tight loop and spam the log. We verify this here by counting the number
// of entries logged in a given time period.
func TestTransport_TcpListenBackoff(t *testing.T) {
// testTime is the amount of time we will allow NetTransport#tcpListen() to run
// This needs to be long enough that to verify that maxDelay is in force,
// but not so long as to be obnoxious when running the test suite.
const testTime = 4 * time.Second
var numCalls int32
countingWriter := testCountingWriter{t, &numCalls}
countingLogger := log.New(countingWriter, "test", log.LstdFlags)
transport := NetTransport{
streamCh: make(chan net.Conn),
logger: countingLogger,
}
transport.wg.Add(1)
// create a listener that will cause AcceptTCP calls to fail
listener, _ := net.ListenTCP("tcp", nil)
listener.Close()
go transport.tcpListen(listener)
// sleep (+yield) for testTime seconds before asking the accept loop to shut down
time.Sleep(testTime)
atomic.StoreInt32(&transport.shutdown, 1)
// Verify that the wg was completed on exit (but without blocking this test)
// maxDelay == 1s, so we will give the routine 1.25s to loop around and shut down.
c := make(chan struct{})
go func() {
defer close(c)
transport.wg.Wait()
}()
select {
case <-c:
case <-time.After(1250 * time.Millisecond):
t.Error("timed out waiting for transport waitgroup to be done after flagging shutdown")
}
// In testTime==4s, we expect to loop approximately 12 times (and log approximately 11 errors),
// with the following delays (in ms):
// 0+5+10+20+40+80+160+320+640+1000+1000+1000 == 4275 ms
// Too few calls suggests that the minDelay is not in force; too many calls suggests that the
// maxDelay is not in force or that the back-off isn't working at all.
// We'll leave a little flex; the important thing here is the asymptotic behavior.
// If the minDelay or maxDelay in NetTransport#tcpListen() are modified, this test may fail
// and need to be adjusted.
require.True(t, numCalls > 8)
require.True(t, numCalls < 14)
// no connections should have been accepted and sent to the channel
require.Equal(t, len(transport.streamCh), 0)
}