diff --git a/tstest/natlab/natlab.go b/tstest/natlab/natlab.go index 4f79e0e75..8933cf897 100644 --- a/tstest/natlab/natlab.go +++ b/tstest/natlab/natlab.go @@ -15,6 +15,7 @@ "context" "fmt" "net" + "sort" "strconv" "sync" "time" @@ -42,6 +43,10 @@ func NewInternet() *Network { return &Network{ v4Pool: mustPrefix("203.0.113.0/24"), // documentation netblock that looks Internet-y v6Pool: mustPrefix("fc00:52::/64"), + pushRoutes: []netaddr.IPPrefix{ + mustPrefix("0.0.0.0/0"), + mustPrefix("::/0"), + }, } } @@ -50,7 +55,7 @@ type Network struct { v4Pool netaddr.IPPrefix v6Pool netaddr.IPPrefix - pushRoute netaddr.IPPrefix + pushRoutes []netaddr.IPPrefix mu sync.Mutex machine map[netaddr.IP]*Machine @@ -58,7 +63,17 @@ type Network struct { lastV6 netaddr.IP } -func (n *Network) AllocIPv4() netaddr.IP { +func (n *Network) addMachineLocked(ip netaddr.IP, m *Machine) { + if m == nil { + return // for tests + } + if n.machine == nil { + n.machine = map[netaddr.IP]*Machine{} + } + n.machine[ip] = m +} + +func (n *Network) allocIPv4(m *Machine) netaddr.IP { n.mu.Lock() defer n.mu.Unlock() if n.lastV4.IsZero() { @@ -70,10 +85,11 @@ func (n *Network) AllocIPv4() netaddr.IP { if !n.v4Pool.Contains(n.lastV4) { panic("pool exhausted") } + n.addMachineLocked(n.lastV4, m) return n.lastV4 } -func (n *Network) AllocIPv6() netaddr.IP { +func (n *Network) allocIPv6(m *Machine) netaddr.IP { n.mu.Lock() defer n.mu.Unlock() if n.lastV6.IsZero() { @@ -85,6 +101,7 @@ func (n *Network) AllocIPv6() netaddr.IP { if !n.v6Pool.Contains(n.lastV6) { panic("pool exhausted") } + n.addMachineLocked(n.lastV6, m) return n.lastV6 } @@ -98,7 +115,19 @@ func addOne(a *[16]byte, index int) { } func (n *Network) write(p []byte, dst, src netaddr.IPPort) (num int, err error) { - panic("TODO") + n.mu.Lock() + defer n.mu.Unlock() + m, ok := n.machine[dst.IP] + if !ok { + // TODO: queue? hang forever? return success? don't fail fast probably. + return 0, fmt.Errorf("unknown dest IP %v", dst.IP) + } + + // Pretend it went across the network. Make a copy so nobody + // can later mess with caller's memory. + pcopy := append([]byte(nil), p...) + go m.deliverIncomingPacket(pcopy, dst, src) + return len(p), nil } type Interface struct { @@ -164,18 +193,65 @@ type Machine struct { conns map[netaddr.IPPort]*conn } -// Attach +func (m *Machine) deliverIncomingPacket(p []byte, dst, src netaddr.IPPort) { + m.mu.Lock() + defer m.mu.Unlock() + + // TODO(danderson): check behavior of dual stack sockets + c, ok := m.conns[dst] + if !ok { + dst = netaddr.IPPort{IP: unspecOf(dst.IP), Port: dst.Port} + c, ok = m.conns[dst] + if !ok { + return + } + } + + select { + case c.in <- incomingPacket{src: src, p: p}: + default: + // Queue overflow. Just drop it. + } +} + +func unspecOf(ip netaddr.IP) netaddr.IP { + if ip.Is4() { + return v4unspec + } + if ip.Is6() { + return v6unspec + } + panic(fmt.Sprintf("bogus IP %#v", ip)) +} + +// Attach adds an interface to a machine. func (m *Machine) Attach(interfaceName string, n *Network) *Interface { f := &Interface{ net: n, name: interfaceName, } - // TODO: get f.ips, routes + if ip := n.allocIPv4(m); !ip.IsZero() { + f.ips = append(f.ips, ip) + } + if ip := n.allocIPv6(m); !ip.IsZero() { + f.ips = append(f.ips, ip) + } m.mu.Lock() defer m.mu.Unlock() + m.interfaces = append(m.interfaces, f) + for _, pfx := range n.pushRoutes { + m.routes = append(m.routes, routeEntry{ + prefix: pfx, + iface: f, + }) + } + sort.Slice(m.routes, func(i, j int) bool { + return m.routes[i].prefix.Bits > m.routes[j].prefix.Bits + }) + return f } @@ -237,6 +313,9 @@ func (m *Machine) registerConn(c *conn) error { if _, ok := m.conns[c.ipp]; ok { return fmt.Errorf("duplicate conn listening on %v", c.ipp) } + if m.conns == nil { + m.conns = map[netaddr.IPPort]*conn{} + } m.conns[c.ipp] = c return nil } @@ -287,6 +366,7 @@ func (m *Machine) ListenPacket(network, address string) (net.PacketConn, error) m: m, fam: fam, ipp: ipp, + in: make(chan incomingPacket, 100), // arbitrary } if err := m.registerConn(c); err != nil { return nil, err @@ -304,6 +384,12 @@ type conn struct { closed bool readDeadline time.Time activeReads map[*activeRead]bool + in chan incomingPacket +} + +type incomingPacket struct { + p []byte + src netaddr.IPPort } type activeRead struct { @@ -367,7 +453,9 @@ func (c *conn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { defer c.registerActiveRead(ar, false) select { - // TODO: select on getting data + case pkt := <-c.in: + n = copy(p, pkt.p) + return n, pkt.src.UDPAddr(), nil case <-ctx.Done(): return 0, nil, context.DeadlineExceeded }