File: //proc/self/root/opt/go/pkg/mod/github.com/hashicorp/
[email protected]/transport.go
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package memberlist
import (
"fmt"
"net"
"time"
)
// Packet is used to provide some metadata about incoming packets from peers
// over a packet connection, as well as the packet payload.
type Packet struct {
// Buf has the raw contents of the packet.
Buf []byte
// From has the address of the peer. This is an actual net.Addr so we
// can expose some concrete details about incoming packets.
From net.Addr
// Timestamp is the time when the packet was received. This should be
// taken as close as possible to the actual receipt time to help make an
// accurate RTT measurement during probes.
Timestamp time.Time
}
// Transport is used to abstract over communicating with other peers. The packet
// interface is assumed to be best-effort and the stream interface is assumed to
// be reliable.
type Transport interface {
// FinalAdvertiseAddr is given the user's configured values (which
// might be empty) and returns the desired IP and port to advertise to
// the rest of the cluster.
FinalAdvertiseAddr(ip string, port int) (net.IP, int, error)
// WriteTo is a packet-oriented interface that fires off the given
// payload to the given address in a connectionless fashion. This should
// return a time stamp that's as close as possible to when the packet
// was transmitted to help make accurate RTT measurements during probes.
//
// This is similar to net.PacketConn, though we didn't want to expose
// that full set of required methods to keep assumptions about the
// underlying plumbing to a minimum. We also treat the address here as a
// string, similar to Dial, so it's network neutral, so this usually is
// in the form of "host:port".
WriteTo(b []byte, addr string) (time.Time, error)
// PacketCh returns a channel that can be read to receive incoming
// packets from other peers. How this is set up for listening is left as
// an exercise for the concrete transport implementations.
PacketCh() <-chan *Packet
// DialTimeout is used to create a connection that allows us to perform
// two-way communication with a peer. This is generally more expensive
// than packet connections so is used for more infrequent operations
// such as anti-entropy or fallback probes if the packet-oriented probe
// failed.
DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
// StreamCh returns a channel that can be read to handle incoming stream
// connections from other peers. How this is set up for listening is
// left as an exercise for the concrete transport implementations.
StreamCh() <-chan net.Conn
// Shutdown is called when memberlist is shutting down; this gives the
// transport a chance to clean up any listeners.
Shutdown() error
}
type Address struct {
// Addr is a network address as a string, similar to Dial. This usually is
// in the form of "host:port". This is required.
Addr string
// Name is the name of the node being addressed. This is optional but
// transports may require it.
Name string
}
func (a *Address) String() string {
if a.Name != "" {
return fmt.Sprintf("%s (%s)", a.Name, a.Addr)
}
return a.Addr
}
// IngestionAwareTransport is not used.
//
// Deprecated: IngestionAwareTransport is not used and may be removed in a future
// version. Define the interface locally instead of referencing this exported
// interface.
type IngestionAwareTransport interface {
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
IngestStream(conn net.Conn) error
}
type NodeAwareTransport interface {
Transport
WriteToAddress(b []byte, addr Address) (time.Time, error)
DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error)
}
type shimNodeAwareTransport struct {
Transport
}
var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil)
func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) {
return t.WriteTo(b, addr.Addr)
}
func (t *shimNodeAwareTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) {
return t.DialTimeout(addr.Addr, timeout)
}
type labelWrappedTransport struct {
label string
NodeAwareTransport
}
var _ NodeAwareTransport = (*labelWrappedTransport)(nil)
func (t *labelWrappedTransport) WriteToAddress(buf []byte, addr Address) (time.Time, error) {
var err error
buf, err = AddLabelHeaderToPacket(buf, t.label)
if err != nil {
return time.Time{}, fmt.Errorf("failed to add label header to packet: %w", err)
}
return t.NodeAwareTransport.WriteToAddress(buf, addr)
}
func (t *labelWrappedTransport) WriteTo(buf []byte, addr string) (time.Time, error) {
var err error
buf, err = AddLabelHeaderToPacket(buf, t.label)
if err != nil {
return time.Time{}, err
}
return t.NodeAwareTransport.WriteTo(buf, addr)
}
func (t *labelWrappedTransport) DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error) {
conn, err := t.NodeAwareTransport.DialAddressTimeout(addr, timeout)
if err != nil {
return nil, err
}
if err := AddLabelHeaderToStream(conn, t.label); err != nil {
return nil, fmt.Errorf("failed to add label header to stream: %w", err)
}
return conn, nil
}
func (t *labelWrappedTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
conn, err := t.NodeAwareTransport.DialTimeout(addr, timeout)
if err != nil {
return nil, err
}
if err := AddLabelHeaderToStream(conn, t.label); err != nil {
return nil, fmt.Errorf("failed to add label header to stream: %w", err)
}
return conn, nil
}