diff --git a/xbee/session.go b/xbee/session.go index b64c046..f6d6b78 100644 --- a/xbee/session.go +++ b/xbee/session.go @@ -108,8 +108,14 @@ func (sess *Session) rxHandler() { sess.log.Warn("error parsing rx packet", "error", err, "data", data) break //continue? } - // take the data and write it to our internal rx packet buffer. - _, err = sess.rxBuf.Write(frame.Payload) + + // write it to either the connection or the default buffer. + if c, ok := sess.conns[frame.Source]; ok { + _, err = c.rxBuf.Write(frame.Payload) + } else { + _, err = sess.rxBuf.Write(frame.Payload) + } + if err != nil { sess.log.Warn("error writing data", "error", err, "payload", frame.Payload) } @@ -150,7 +156,8 @@ func (sess *Session) Write(p []byte) (int, error) { } -// internal function used by Conn to write data to a specific address. +// internal function used to write data to a specific address. +// The Write() call uses 0xFFFF (broadcast address). func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) { idx, ch, err := sess.ct.GetMark() @@ -175,10 +182,17 @@ func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) { // finally, wait for the channel we got to return. this means that // the matching response frame was received, so we can parse it. // TODO: add timeout. - responseFrame := <-ch + + var status *TxStatusFrame + select { + case responseFrame := <-ch: + status, err = ParseTxStatusFrame(responseFrame) + case <-time.After(1 * time.Second): + return n, errors.New("timeout waiting for response") + } + // this is a tx status frame bytes, so lets parse it out. - status, err := ParseTxStatusFrame(responseFrame) if err != nil { return } @@ -194,7 +208,7 @@ func (sess *Session) writeAddr(p []byte, dest uint64) (n int, err error) { // instead, an AC command must be set to apply the queued changes. `queued` does not // affect query-type commands, which always return right away. // the AT command is an interface. -func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) ([]byte, error) { +func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) (payload []byte, err error) { // we must encode the command, and then create the actual packet. // then we send the packet, and wait for the response // TODO: how to handle multiple-response-packet AT commands? @@ -216,21 +230,21 @@ func (sess *Session) ATCommand(cmd [2]rune, data []byte, queued bool) ([]byte, e return nil, fmt.Errorf("error writing xbee frame: %w", err) } - // TODO: add timeout. - var respBytes []byte + var resp *ATCmdResponse select { - case respBytes = <-ch: + case b := <-ch: + resp, err = ParseATCmdResponse(b) case <-time.After(1 * time.Second): return nil, errors.New("timeout waiting for response frame") } - resp, err := ParseATCmdResponse(respBytes) + + if err != nil { return nil, err } if resp.Status != 0 { - // sinec ATCmdStatus is a stringer thanks to the generator return resp.Data, fmt.Errorf("AT command failed: %v", resp.Status) } @@ -261,13 +275,29 @@ func (sess *Session) RemoteAddr() XBeeAddr { return 0xFFFF } + +func (sess *Session) Dial(addr uint64) (conn *Conn, err error) { + if _, exist := sess.conns[addr]; exist { + return nil, errors.New("address already in use") + } + + rd, wr := io.Pipe() + + conn.rxBuf = bufio.NewReadWriter(bufio.NewReader(rd), bufio.NewWriter(wr)) + conn.addr = XBeeAddr(addr) + conn.parent = sess + + // add it to the list + sess.conns[addr] = conn + return +} + /* The session implements a io.Writer and io.Reader, but does not have a way of connecting to a specific XBee by default. To do this, we would need to either pass an address to the write and read methods (breaking io.ReadWriter), or add another command. Rather than do that, we can make a "Conn" class, which represents a single connection to a device on the network. - */ // Conn is a connection to a specific remote XBee. Conn allows for the user to @@ -276,13 +306,21 @@ a single connection to a device on the network. type Conn struct { parent *Session addr XBeeAddr + + // data is written here by session rxHandler + rxBuf *bufio.ReadWriter } -func (c *Conn) Write(p []byte) (int, error) { +func (c *Conn) Write(p []byte) (n int, err error) { return c.parent.writeAddr(p, uint64(c.addr)) } +func (c *Conn) Read(p []byte) (n int, err error) { + return c.rxBuf.Read(p) +} + func (c *Conn) Close() error { + // remove ourselves from the conn list. return nil }