diff --git a/src/admin/admin.go b/src/admin/admin.go index 7cf5f330..27dabdcc 100644 --- a/src/admin/admin.go +++ b/src/admin/admin.go @@ -1,12 +1,14 @@ package admin import ( + "encoding/hex" "encoding/json" "errors" "fmt" "net" "net/url" "os" + "strconv" "strings" "time" @@ -166,132 +168,131 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log. } return Info{"sessions": sessions}, nil }) - /* - a.AddHandler("addPeer", []string{"uri", "[interface]"}, func(in Info) (Info, error) { - // Set sane defaults - intf := "" - // Has interface been specified? - if itf, ok := in["interface"]; ok { - intf = itf.(string) - } - if a.addPeer(in["uri"].(string), intf) == nil { - return Info{ - "added": []string{ - in["uri"].(string), - }, - }, nil - } else { - return Info{ - "not_added": []string{ - in["uri"].(string), - }, - }, errors.New("Failed to add peer") - } - }) - a.AddHandler("removePeer", []string{"port"}, func(in Info) (Info, error) { - if a.removePeer(fmt.Sprint(in["port"])) == nil { - return Info{ - "removed": []string{ - fmt.Sprint(in["port"]), - }, - }, nil - } else { - return Info{ - "not_removed": []string{ - fmt.Sprint(in["port"]), - }, - }, errors.New("Failed to remove peer") - } - }) - a.AddHandler("getAllowedEncryptionPublicKeys", []string{}, func(in Info) (Info, error) { - return Info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil - }) - a.AddHandler("addAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) { - if a.addAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { - return Info{ - "added": []string{ - in["box_pub_key"].(string), - }, - }, nil - } else { - return Info{ - "not_added": []string{ - in["box_pub_key"].(string), - }, - }, errors.New("Failed to add allowed key") - } - }) - a.AddHandler("removeAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) { - if a.removeAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { - return Info{ - "removed": []string{ - in["box_pub_key"].(string), - }, - }, nil - } else { - return Info{ - "not_removed": []string{ - in["box_pub_key"].(string), - }, - }, errors.New("Failed to remove allowed key") - } - }) - a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) { - if in["target"] == nil { - in["target"] = "none" - } - result, err := a.admin_dhtPing(in["box_pub_key"].(string), in["coords"].(string), in["target"].(string)) - if err == nil { - infos := make(map[string]map[string]string, len(result.Infos)) - for _, dinfo := range result.Infos { - info := map[string]string{ - "box_pub_key": hex.EncodeToString(dinfo.key[:]), - "coords": fmt.Sprintf("%v", dinfo.coords), - } - addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.key))[:]).String() - infos[addr] = info + a.AddHandler("addPeer", []string{"uri", "[interface]"}, func(in Info) (Info, error) { + // Set sane defaults + intf := "" + // Has interface been specified? + if itf, ok := in["interface"]; ok { + intf = itf.(string) + } + if a.core.AddPeer(in["uri"].(string), intf) == nil { + return Info{ + "added": []string{ + in["uri"].(string), + }, + }, nil + } else { + return Info{ + "not_added": []string{ + in["uri"].(string), + }, + }, errors.New("Failed to add peer") + } + }) + a.AddHandler("removePeer", []string{"port"}, func(in Info) (Info, error) { + port, err := strconv.ParseInt(fmt.Sprint(in["port"]), 10, 64) + if err != nil { + return Info{}, err + } + if a.core.DisconnectPeer(uint64(port)) == nil { + return Info{ + "removed": []string{ + fmt.Sprint(port), + }, + }, nil + } else { + return Info{ + "not_removed": []string{ + fmt.Sprint(port), + }, + }, errors.New("Failed to remove peer") + } + }) + a.AddHandler("getAllowedEncryptionPublicKeys", []string{}, func(in Info) (Info, error) { + return Info{"allowed_box_pubs": a.core.GetAllowedEncryptionPublicKeys()}, nil + }) + a.AddHandler("addAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) { + if a.core.AddAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { + return Info{ + "added": []string{ + in["box_pub_key"].(string), + }, + }, nil + } else { + return Info{ + "not_added": []string{ + in["box_pub_key"].(string), + }, + }, errors.New("Failed to add allowed key") + } + }) + a.AddHandler("removeAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) { + if a.core.RemoveAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { + return Info{ + "removed": []string{ + in["box_pub_key"].(string), + }, + }, nil + } else { + return Info{ + "not_removed": []string{ + in["box_pub_key"].(string), + }, + }, errors.New("Failed to remove allowed key") + } + }) + a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) { + if in["target"] == nil { + in["target"] = "none" + } + result, err := a.core.DHTPing(in["box_pub_key"].(string), in["coords"].(string), in["target"].(string)) + if err == nil { + infos := make(map[string]map[string]string, len(result.Infos)) + for _, dinfo := range result.Infos { + info := map[string]string{ + "box_pub_key": hex.EncodeToString(dinfo.PublicKey[:]), + "coords": fmt.Sprintf("%v", dinfo.Coords), } - return Info{"nodes": infos}, nil + addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.PublicKey))[:]).String() + infos[addr] = info + } + return Info{"nodes": infos}, nil + } else { + return Info{}, err + } + }) + a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) { + var nocache bool + if in["nocache"] != nil { + nocache = in["nocache"].(string) == "true" + } + var box_pub_key, coords string + if in["box_pub_key"] == nil && in["coords"] == nil { + nodeinfo := a.core.MyNodeInfo() + var jsoninfo interface{} + if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil { + return Info{}, err + } else { + return Info{"nodeinfo": jsoninfo}, nil + } + } else if in["box_pub_key"] == nil || in["coords"] == nil { + return Info{}, errors.New("Expecting both box_pub_key and coords") + } else { + box_pub_key = in["box_pub_key"].(string) + coords = in["coords"].(string) + } + result, err := a.core.GetNodeInfo(box_pub_key, coords, nocache) + if err == nil { + var m map[string]interface{} + if err = json.Unmarshal(result, &m); err == nil { + return Info{"nodeinfo": m}, nil } else { return Info{}, err } - }) - a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) { - var nocache bool - if in["nocache"] != nil { - nocache = in["nocache"].(string) == "true" - } - var box_pub_key, coords string - if in["box_pub_key"] == nil && in["coords"] == nil { - var nodeinfo []byte - a.core.router.doAdmin(func() { - nodeinfo = []byte(a.core.router.nodeinfo.getNodeInfo()) - }) - var jsoninfo interface{} - if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil { - return Info{}, err - } else { - return Info{"nodeinfo": jsoninfo}, nil - } - } else if in["box_pub_key"] == nil || in["coords"] == nil { - return Info{}, errors.New("Expecting both box_pub_key and coords") - } else { - box_pub_key = in["box_pub_key"].(string) - coords = in["coords"].(string) - } - result, err := a.admin_getNodeInfo(box_pub_key, coords, nocache) - if err == nil { - var m map[string]interface{} - if err = json.Unmarshal(result, &m); err == nil { - return Info{"nodeinfo": m}, nil - } else { - return Info{}, err - } - } else { - return Info{}, err - } - }) - */ + } else { + return Info{}, err + } + }) } // start runs the admin API socket to listen for / respond to admin API calls. @@ -458,112 +459,6 @@ func (a *AdminSocket) handleRequest(conn net.Conn) { } } -/* -// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID. -func (a *AdminSocket) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) { - var key crypto.BoxPubKey - if keyBytes, err := hex.DecodeString(keyString); err != nil { - return dhtRes{}, err - } else { - copy(key[:], keyBytes) - } - var coords []byte - for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { - if cstr == "" { - // Special case, happens if trimmed is the empty string, e.g. this is the root - continue - } - if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { - return dhtRes{}, err - } else { - coords = append(coords, uint8(u64)) - } - } - resCh := make(chan *dhtRes, 1) - info := dhtInfo{ - key: key, - coords: coords, - } - target := *info.getNodeID() - if targetString == "none" { - // Leave the default target in place - } else if targetBytes, err := hex.DecodeString(targetString); err != nil { - return dhtRes{}, err - } else if len(targetBytes) != len(target) { - return dhtRes{}, errors.New("Incorrect target NodeID length") - } else { - var target crypto.NodeID - copy(target[:], targetBytes) - } - rq := dhtReqKey{info.key, target} - sendPing := func() { - a.core.dht.addCallback(&rq, func(res *dhtRes) { - defer func() { recover() }() - select { - case resCh <- res: - default: - } - }) - a.core.dht.ping(&info, &target) - } - a.core.router.doAdmin(sendPing) - go func() { - time.Sleep(6 * time.Second) - close(resCh) - }() - for res := range resCh { - return *res, nil - } - return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) -} - -func (a *AdminSocket) admin_getNodeInfo(keyString, coordString string, nocache bool) (nodeinfoPayload, error) { - var key crypto.BoxPubKey - if keyBytes, err := hex.DecodeString(keyString); err != nil { - return nodeinfoPayload{}, err - } else { - copy(key[:], keyBytes) - } - if !nocache { - if response, err := a.core.router.nodeinfo.getCachedNodeInfo(key); err == nil { - return response, nil - } - } - var coords []byte - for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { - if cstr == "" { - // Special case, happens if trimmed is the empty string, e.g. this is the root - continue - } - if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { - return nodeinfoPayload{}, err - } else { - coords = append(coords, uint8(u64)) - } - } - response := make(chan *nodeinfoPayload, 1) - sendNodeInfoRequest := func() { - a.core.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { - defer func() { recover() }() - select { - case response <- nodeinfo: - default: - } - }) - a.core.router.nodeinfo.sendNodeInfo(key, coords, false) - } - a.core.router.doAdmin(sendNodeInfoRequest) - go func() { - time.Sleep(6 * time.Second) - close(response) - }() - for res := range response { - return *res, nil - } - return nodeinfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString)) -} -*/ - // getResponse_dot returns a response for a graphviz dot formatted // representation of the known parts of the network. This is color-coded and // labeled, and includes the self node, switch peers, nodes known to the DHT, diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index e7050c7b..40e5a2b2 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -3,8 +3,11 @@ package yggdrasil import ( "encoding/hex" "errors" + "fmt" "net" "sort" + "strconv" + "strings" "sync/atomic" "time" @@ -47,6 +50,17 @@ type DHTEntry struct { LastSeen time.Duration } +// DHTRes represents a DHT response, as returned by DHTPing. +type DHTRes struct { + PublicKey crypto.BoxPubKey // key of the sender + Coords []byte // coords of the sender + Dest crypto.NodeID // the destination node ID + Infos []DHTEntry // response +} + +// NodeInfoPayload represents a RequestNodeInfo response, in bytes. +type NodeInfoPayload nodeinfoPayload + // SwitchQueues represents information from the switch related to link // congestion and a list of switch queues created in response to congestion on a // given link. @@ -123,7 +137,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer { } coords := elem.locator.getCoords() info := SwitchPeer{ - Coords: coords, + Coords: append([]byte{}, coords...), BytesSent: atomic.LoadUint64(&peer.bytesSent), BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd), Port: uint64(elem.port), @@ -151,7 +165,7 @@ func (c *Core) GetDHT() []DHTEntry { }) for _, v := range dhtentry { info := DHTEntry{ - Coords: v.coords, + Coords: append([]byte{}, v.coords...), LastSeen: now.Sub(v.recv), } copy(info.PublicKey[:], v.key[:]) @@ -198,7 +212,7 @@ func (c *Core) GetSessions() []Session { for _, sinfo := range c.sessions.sinfos { // TODO? skipped known but timed out sessions? session := Session{ - Coords: sinfo.coords, + Coords: append([]byte{}, sinfo.coords...), MTU: sinfo.getMTU(), BytesSent: sinfo.bytesSent, BytesRecvd: sinfo.bytesRecvd, @@ -319,7 +333,7 @@ func (c *Core) RouterAddresses() (address.Address, address.Subnet) { } // NodeInfo gets the currently configured nodeinfo. -func (c *Core) NodeInfo() nodeinfoPayload { +func (c *Core) MyNodeInfo() nodeinfoPayload { return c.router.nodeinfo.getNodeInfo() } @@ -329,6 +343,56 @@ func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) { c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) } +// GetNodeInfo requests nodeinfo from a remote node, as specified by the public +// key and coordinates specified. The third parameter specifies whether a cached +// result is acceptable - this results in less traffic being generated than is +// necessary when, e.g. crawling the network. +func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInfoPayload, error) { + var key crypto.BoxPubKey + if keyBytes, err := hex.DecodeString(keyString); err != nil { + return NodeInfoPayload{}, err + } else { + copy(key[:], keyBytes) + } + if !nocache { + if response, err := c.router.nodeinfo.getCachedNodeInfo(key); err == nil { + return NodeInfoPayload(response), nil + } + } + var coords []byte + for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { + if cstr == "" { + // Special case, happens if trimmed is the empty string, e.g. this is the root + continue + } + if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { + return NodeInfoPayload{}, err + } else { + coords = append(coords, uint8(u64)) + } + } + response := make(chan *nodeinfoPayload, 1) + sendNodeInfoRequest := func() { + c.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { + defer func() { recover() }() + select { + case response <- nodeinfo: + default: + } + }) + c.router.nodeinfo.sendNodeInfo(key, coords, false) + } + c.router.doAdmin(sendNodeInfoRequest) + go func() { + time.Sleep(6 * time.Second) + close(response) + }() + for res := range response { + return NodeInfoPayload(*res), nil + } + return NodeInfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString)) +} + // SetLogger sets the output logger of the Yggdrasil node after startup. This // may be useful if you want to redirect the output later. func (c *Core) SetLogger(log *log.Logger) { @@ -354,6 +418,14 @@ func (c *Core) AddPeer(addr string, sintf string) error { return nil } +// RemovePeer is not implemented yet. +func (c *Core) RemovePeer(addr string, sintf string) error { + // TODO: Implement a reverse of AddPeer, where we look up the port number + // based on the addr and sintf, disconnect it and then remove it from the + // peers list so we don't reconnect to it later + return errors.New("not implemented") +} + // CallPeer calls a peer once. This should be specified in the peer URI format, // e.g.: // tcp://a.b.c.d:e @@ -364,9 +436,99 @@ func (c *Core) CallPeer(addr string, sintf string) error { return c.link.call(addr, sintf) } -// AddAllowedEncryptionPublicKey adds an allowed public key. This allow peerings -// to be restricted only to keys that you have selected. -func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error { - //return c.admin.addAllowedEncryptionPublicKey(boxStr) +// DisconnectPeer disconnects a peer once. This should be specified as a port +// number. +func (c *Core) DisconnectPeer(port uint64) error { + c.peers.removePeer(switchPort(port)) return nil } + +// GetAllowedEncryptionPublicKeys returns the public keys permitted for incoming +// peer connections. +func (c *Core) GetAllowedEncryptionPublicKeys() []string { + return c.peers.getAllowedEncryptionPublicKeys() +} + +// AddAllowedEncryptionPublicKey whitelists a key for incoming peer connections. +func (c *Core) AddAllowedEncryptionPublicKey(bstr string) (err error) { + c.peers.addAllowedEncryptionPublicKey(bstr) + return nil +} + +// RemoveAllowedEncryptionPublicKey removes a key from the whitelist for +// incoming peer connections. If none are set, an empty list permits all +// incoming connections. +func (c *Core) RemoveAllowedEncryptionPublicKey(bstr string) (err error) { + c.peers.removeAllowedEncryptionPublicKey(bstr) + return nil +} + +// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID. +func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, error) { + var key crypto.BoxPubKey + if keyBytes, err := hex.DecodeString(keyString); err != nil { + return DHTRes{}, err + } else { + copy(key[:], keyBytes) + } + var coords []byte + for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { + if cstr == "" { + // Special case, happens if trimmed is the empty string, e.g. this is the root + continue + } + if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { + return DHTRes{}, err + } else { + coords = append(coords, uint8(u64)) + } + } + resCh := make(chan *dhtRes, 1) + info := dhtInfo{ + key: key, + coords: coords, + } + target := *info.getNodeID() + if targetString == "none" { + // Leave the default target in place + } else if targetBytes, err := hex.DecodeString(targetString); err != nil { + return DHTRes{}, err + } else if len(targetBytes) != len(target) { + return DHTRes{}, errors.New("Incorrect target NodeID length") + } else { + var target crypto.NodeID + copy(target[:], targetBytes) + } + rq := dhtReqKey{info.key, target} + sendPing := func() { + c.dht.addCallback(&rq, func(res *dhtRes) { + defer func() { recover() }() + select { + case resCh <- res: + default: + } + }) + c.dht.ping(&info, &target) + } + c.router.doAdmin(sendPing) + go func() { + time.Sleep(6 * time.Second) + close(resCh) + }() + // TODO: do something better than the below... + for res := range resCh { + r := DHTRes{ + Coords: append([]byte{}, res.Coords...), + } + copy(r.PublicKey[:], res.Key[:]) + for _, i := range res.Infos { + e := DHTEntry{ + Coords: append([]byte{}, i.coords...), + } + copy(e.PublicKey[:], i.key[:]) + r.Infos = append(r.Infos, e) + } + return r, nil + } + return DHTRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) +}