From 8727dc43c7fd52287c4bb57b606c047b173a4657 Mon Sep 17 00:00:00 2001 From: saji Date: Thu, 25 May 2023 13:01:50 -0500 Subject: [PATCH] cli fixes --- broker.go | 57 +++++++++++++++++- cmd/gotelem/cli/root.go | 3 + cmd/gotelem/cli/server.go | 16 ++++- cmd/gotelem/cli/socketcan.go | 30 +++++++--- cmd/gotelem/cli/xbee.go | 22 ++++--- cmd/skylabify/skylabify.go | 9 +-- frame_gen.go | 83 ------------------------- frame_gen_test.go | 113 ----------------------------------- skylab_logger.go | 6 +- 9 files changed, 108 insertions(+), 231 deletions(-) diff --git a/broker.go b/broker.go index a6285e8..57e9a09 100644 --- a/broker.go +++ b/broker.go @@ -1,6 +1,10 @@ package gotelem -import "fmt" +import ( + "errors" + "fmt" + "sync" +) type BrokerRequest struct { Source string // the name of the sender @@ -82,4 +86,53 @@ func (b *Broker) Unsubscribe(name string) { b.unsubCh <- bc } -// TODO: don't use channels for everything to avoid using a mutex + + + +type JBroker struct { + subs map[string] chan CANDumpJSON // contains the channel for each subsciber + + lock sync.RWMutex +} + +func (b *JBroker) Subscribe(name string) (ch chan CANDumpJSON, err error) { + // get rw lock. + b.lock.Lock() + defer b.lock.Unlock() + _, ok := b.subs[name] + if ok { + return nil, errors.New("name already in use") + } + ch = make(chan CANDumpJSON, 10) + + return +} + +func (b *JBroker) Unsubscribe(name string) { + // if the channel is in use, close it, else do nothing. + b.lock.Lock() + defer b.lock.Unlock() + ch, ok := b.subs[name] + if ok { + close(ch) + } + delete(b.subs, name) +} + +func (b *JBroker) Publish(sender string, message CANDumpJSON) { + go func() { + b.lock.RLock() + defer b.lock.RUnlock() + for name, ch := range b.subs { + if name == sender { + continue + } + // non blocking send. + select { + case ch <- message: + default: + } + } + + }() +} diff --git a/cmd/gotelem/cli/root.go b/cmd/gotelem/cli/root.go index 973e74a..02f30d3 100644 --- a/cmd/gotelem/cli/root.go +++ b/cmd/gotelem/cli/root.go @@ -1,6 +1,7 @@ package cli import ( + "fmt" "log" "os" @@ -20,6 +21,8 @@ func Execute() { Commands: subCmds, } + fmt.Println(serveFlags) + if err := app.Run(os.Args); err != nil { log.Fatal(err) } diff --git a/cmd/gotelem/cli/server.go b/cmd/gotelem/cli/server.go index f82eb1e..c9703a0 100644 --- a/cmd/gotelem/cli/server.go +++ b/cmd/gotelem/cli/server.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "os" + "sync" "time" "github.com/kschamplin/gotelem" @@ -15,8 +16,8 @@ import ( var serveFlags = []cli.Flag{ &cli.StringFlag{ - Name: "device", - Aliases: []string{"d"}, + Name: "xbee", + Aliases: []string{"x"}, Usage: "The XBee to connect to. Leave blank to not use XBee", EnvVars: []string{"XBEE_DEVICE"}, }, @@ -75,10 +76,13 @@ func serve(cCtx *cli.Context) error { go broker.Start() + wg := sync.WaitGroup{} for _, svc := range serveThings { svcLogger := deriveLogger(logger, svc) logger.Info("starting service", "svc", svc.String()) go func(mySvc service) { + wg.Add(1) + defer wg.Done() err := mySvc.Start(cCtx, broker, svcLogger) if err != nil { logger.Error("service stopped!", "err", err, "svc", mySvc.String()) @@ -86,6 +90,10 @@ func serve(cCtx *cli.Context) error { }(svc) } + + wg.Wait() + + // tcp listener server. ln, err := net.Listen("tcp", ":8082") if err != nil { @@ -110,7 +118,6 @@ func tcpSvc(ctx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) error // TODO: extract port/ip from cli context. ln, err := net.Listen("tcp", ":8082") if err != nil { - fmt.Printf("Error listening: %v\n", err) logger.Warn("error listening", "err", err) return err } @@ -195,6 +202,9 @@ func (c *CanLoggerService) Start(cCtx *cli.Context, broker *gotelem.Broker, l * } } + +// XBeeService provides data over an Xbee device, either by serial or TCP +// based on the url provided in the xbee flag. see the description for details. type XBeeService struct { session *xbee.Session } diff --git a/cmd/gotelem/cli/socketcan.go b/cmd/gotelem/cli/socketcan.go index e036050..ffdfb25 100644 --- a/cmd/gotelem/cli/socketcan.go +++ b/cmd/gotelem/cli/socketcan.go @@ -20,25 +20,25 @@ var canDevFlag = &cli.StringFlag{ Aliases: []string{"c"}, Usage: "CAN device string", EnvVars: []string{"CAN_DEVICE"}, - DefaultText: "vcan0", } // this function sets up the `serve` flags and services that use socketCAN func init() { - serveFlags = append(serveFlags, &cli.BoolFlag{Name: "test", Usage: "use vcan0 test"}) - serveFlags = append(serveFlags, canDevFlag) - // add services for server + // add the CAN flags to the serve command + serveCmd.Flags = append(serveCmd.Flags, &cli.BoolFlag{Name: "test", Usage: "use vcan0 test"}) + serveCmd.Flags = append(serveCmd.Flags, canDevFlag) + // add services for server serveThings = append(serveThings, &socketCANService{}) // add can subcommand/actions - // TODO: make socketcan utility commands. + // TODO: make more utility commands. subCmds = append(subCmds, socketCANCmd) } -// FIXME: add logging back in since it's missing rn type socketCANService struct { + name string sock socketcan.CanSocket } @@ -47,23 +47,37 @@ func (s *socketCANService) Status() { } func (s *socketCANService) String() string { - return "" + if s.name == "" { + return "socketCAN" + } + return s.name } func (s *socketCANService) Start(cCtx *cli.Context, broker *gotelem.Broker, logger *slog.Logger) (err error) { // vcan0 demo + if cCtx.String("can") == "" { + return + } + if strings.HasPrefix(cCtx.String("can"), "v") { go vcanTest(cCtx.String("can")) } - rxCh := broker.Subscribe("socketCAN") sock, err := socketcan.NewCanSocket(cCtx.String("can")) if err != nil { logger.Error("error opening socket", "err", err) return } + defer sock.Close() + s.name = sock.Name() + // connect to the broker + rxCh := broker.Subscribe("socketCAN") + defer broker.Unsubscribe("socketCAN") + + + // make a channel to receive socketCAN frames. rxCan := make(chan gotelem.Frame) go func() { diff --git a/cmd/gotelem/cli/xbee.go b/cmd/gotelem/cli/xbee.go index 87c38f9..a34f197 100644 --- a/cmd/gotelem/cli/xbee.go +++ b/cmd/gotelem/cli/xbee.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "os" + "syscall" "github.com/kschamplin/gotelem/xbee" "github.com/urfave/cli/v2" @@ -22,14 +23,13 @@ const ( keyIODevice ctxKey = iota ) -var xbeeDeviceFlag = &cli.StringFlag{ - Name: "device", - Aliases: []string{"d"}, - Usage: "The XBee to connect to", - Required: true, - EnvVars: []string{"XBEE_DEVICE"}, - } - +var xbeeDeviceFlag = &cli.StringFlag{ + Name: "device", + Aliases: []string{"d"}, + Usage: "The XBee to connect to", + Required: true, + EnvVars: []string{"XBEE_DEVICE"}, +} var xbeeCmd = &cli.Command{ Name: "xbee", @@ -92,7 +92,6 @@ writtend to stdout. } func xbeeInfo(ctx *cli.Context) error { - logger := slog.New(slog.NewTextHandler(os.Stderr)) transport := ctx.Context.Value(keyIODevice).(*xbee.Transport) xb, err := xbee.NewSession(transport, logger.With("device", transport.Type())) @@ -106,14 +105,14 @@ func xbeeInfo(ctx *cli.Context) error { } fmt.Printf("Network ID: %X\n", binary.BigEndian.Uint16(b)) return nil - } + func netcat(ctx *cli.Context) error { if ctx.Args().Len() < 1 { cli.ShowSubcommandHelp(ctx) - return cli.Exit("missing [addr] argument", 1) + return cli.Exit("missing [addr] argument", int(syscall.EINVAL)) } logger := slog.New(slog.NewTextHandler(os.Stderr)) @@ -141,4 +140,3 @@ func netcat(ctx *cli.Context) error { return nil } - diff --git a/cmd/skylabify/skylabify.go b/cmd/skylabify/skylabify.go index b436e02..ba6c3a6 100644 --- a/cmd/skylabify/skylabify.go +++ b/cmd/skylabify/skylabify.go @@ -12,6 +12,7 @@ import ( "strings" "syscall" + "github.com/kschamplin/gotelem" "github.com/kschamplin/gotelem/skylab" "github.com/urfave/cli/v2" ) @@ -91,7 +92,7 @@ func run(ctx *cli.Context) (err error) { segments := strings.Split(dumpLine, " ") - var cd candumpJSON + var cd gotelem.CANDumpJSON // this is cursed but easiest way to get a float from a string. fmt.Sscanf(segments[0], "(%g)", &cd.Timestamp) @@ -122,9 +123,3 @@ func run(ctx *cli.Context) (err error) { } } - -type candumpJSON struct { - Timestamp float64 `json:"ts"` - Id uint64 `json:"id"` - Data skylab.Packet `json:"data"` -} diff --git a/frame_gen.go b/frame_gen.go index 26e20ad..2baf6af 100644 --- a/frame_gen.go +++ b/frame_gen.go @@ -159,89 +159,6 @@ func (z CanFilter) Msgsize() (s int) { return } -// DecodeMsg implements msgp.Decodable -func (z *CanWriter) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z CanWriter) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 0 - err = en.Append(0x80) - if err != nil { - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z CanWriter) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 0 - o = append(o, 0x80) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *CanWriter) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z CanWriter) Msgsize() (s int) { - s = 1 - return -} - // DecodeMsg implements msgp.Decodable func (z *Frame) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/frame_gen_test.go b/frame_gen_test.go index 6b93c97..c7d8481 100644 --- a/frame_gen_test.go +++ b/frame_gen_test.go @@ -122,119 +122,6 @@ func BenchmarkDecodeCanFilter(b *testing.B) { } } -func TestMarshalUnmarshalCanWriter(t *testing.T) { - v := CanWriter{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgCanWriter(b *testing.B) { - v := CanWriter{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgCanWriter(b *testing.B) { - v := CanWriter{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshalCanWriter(b *testing.B) { - v := CanWriter{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodeCanWriter(t *testing.T) { - v := CanWriter{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Log("WARNING: TestEncodeDecodeCanWriter Msgsize() is inaccurate") - } - - vn := CanWriter{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodeCanWriter(b *testing.B) { - v := CanWriter{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodeCanWriter(b *testing.B) { - v := CanWriter{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} - func TestMarshalUnmarshalFrame(t *testing.T) { v := Frame{} bts, err := v.MarshalMsg(nil) diff --git a/skylab_logger.go b/skylab_logger.go index d622f64..fc65fe8 100644 --- a/skylab_logger.go +++ b/skylab_logger.go @@ -11,8 +11,8 @@ import ( // CanWriter type CanWriter struct { - output *os.File - cd candumpJSON + output *os.File + cd CANDumpJSON jsonBuf []byte } @@ -51,7 +51,7 @@ func OpenCanWriter(name string) (*CanWriter, error) { return cw, nil } -type candumpJSON struct { +type CANDumpJSON struct { Timestamp float64 `json:"ts"` Id uint64 `json:"id"` Data skylab.Packet `json:"data"`