xbee: added direct conn

This commit is contained in:
saji 2023-05-10 23:56:45 -05:00
parent d31b80a2fd
commit 08ab0e3509

View file

@ -108,8 +108,14 @@ func (sess *Session) rxHandler() {
sess.log.Warn("error parsing rx packet", "error", err, "data", data)
break //continue?
}
// take the data and write it to our internal rx packet buffer.
_, err = sess.rxBuf.Write(frame.Payload)
// write it to either the connection or the default buffer.
if c, ok := sess.conns[frame.Source]; ok {
_, err = c.rxBuf.Write(frame.Payload)
} else {
_, err = sess.rxBuf.Write(frame.Payload)
}
if err != nil {
sess.log.Warn("error writing data", "error", err, "payload", frame.Payload)
}
@ -150,7 +156,8 @@ func (sess *Session) Write(p []byte) (int, error) {
}
// internal function used by Conn to write data to a specific address.
// internal function used to write data to a specific address.
// The Write() call uses 0xFFFF (broadcast address).
func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) {
idx, ch, err := sess.ct.GetMark()
@ -175,10 +182,17 @@ func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) {
// finally, wait for the channel we got to return. this means that
// the matching response frame was received, so we can parse it.
// TODO: add timeout.
responseFrame := <-ch
var status *TxStatusFrame
select {
case responseFrame := <-ch:
status, err = ParseTxStatusFrame(responseFrame)
case <-time.After(1 * time.Second):
return n, errors.New("timeout waiting for response")
}
// this is a tx status frame bytes, so lets parse it out.
status, err := ParseTxStatusFrame(responseFrame)
if err != nil {
return
}
@ -194,7 +208,7 @@ func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) {
// instead, an AC command must be set to apply the queued changes. `queued` does not
// affect query-type commands, which always return right away.
// the AT command is an interface.
func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) ([]byte, error) {
func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) (payload []byte, err error) {
// we must encode the command, and then create the actual packet.
// then we send the packet, and wait for the response
// TODO: how to handle multiple-response-packet AT commands?
@ -216,21 +230,21 @@ func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) ([]byte, e
return nil, fmt.Errorf("error writing xbee frame: %w", err)
}
// TODO: add timeout.
var respBytes []byte
var resp *ATCmdResponse
select {
case respBytes = <-ch:
case b := <-ch:
resp, err = ParseATCmdResponse(b)
case <-time.After(1 * time.Second):
return nil, errors.New("timeout waiting for response frame")
}
resp, err := ParseATCmdResponse(respBytes)
if err != nil {
return nil, err
}
if resp.Status != 0 {
// sinec ATCmdStatus is a stringer thanks to the generator
return resp.Data, fmt.Errorf("AT command failed: %v", resp.Status)
}
@ -261,13 +275,29 @@ func (sess *Session) RemoteAddr() XBeeAddr {
return 0xFFFF
}
func (sess *Session) Dial(addr uint64) (conn *Conn, err error) {
if _, exist := sess.conns[addr]; exist {
return nil, errors.New("address already in use")
}
rd, wr := io.Pipe()
conn.rxBuf = bufio.NewReadWriter(bufio.NewReader(rd), bufio.NewWriter(wr))
conn.addr = XBeeAddr(addr)
conn.parent = sess
// add it to the list
sess.conns[addr] = conn
return
}
/*
The session implements a io.Writer and io.Reader, but does not
have a way of connecting to a specific XBee by default. To do this, we would
need to either pass an address to the write and read methods (breaking io.ReadWriter),
or add another command. Rather than do that, we can make a "Conn" class, which represents
a single connection to a device on the network.
*/
// Conn is a connection to a specific remote XBee. Conn allows for the user to
@ -276,13 +306,21 @@ a single connection to a device on the network.
type Conn struct {
parent *Session
addr XBeeAddr
// data is written here by session rxHandler
rxBuf *bufio.ReadWriter
}
func (c *Conn) Write(p []byte) (int, error) {
func (c *Conn) Write(p []byte) (n int, err error) {
return c.parent.writeAddr(p, uint64(c.addr))
}
func (c *Conn) Read(p []byte) (n int, err error) {
return c.rxBuf.Read(p)
}
func (c *Conn) Close() error {
// remove ourselves from the conn list.
return nil
}