Compare commits
46 commits
separate-p
...
master
Author | SHA1 | Date | |
---|---|---|---|
![]() |
900ad6d495 | ||
![]() |
16cc7019bc | ||
![]() |
4566ea369c | ||
![]() |
1e72b93143 | ||
![]() |
48b40ee30f | ||
![]() |
9e9081fa4a | ||
![]() |
2dc5a0457b | ||
![]() |
0f2af76156 | ||
![]() |
2e36581665 | ||
![]() |
4829dd50c7 | ||
![]() |
13205c1668 | ||
![]() |
fe4cdfa0a4 | ||
![]() |
e9d40ce466 | ||
![]() |
d702395d5b | ||
![]() |
90e8c3f101 | ||
![]() |
a28393388b | ||
![]() |
54b7427428 | ||
![]() |
e08ab050ef | ||
![]() |
cf112ef561 | ||
![]() |
641c35afbd | ||
![]() |
4a292aa009 | ||
![]() |
d5381a3c33 | ||
![]() |
0b5a917e40 | ||
![]() |
1ff4adf5e4 | ||
![]() |
c8034066c9 | ||
![]() |
3c1a96c8e0 | ||
![]() |
f380631b5e | ||
![]() |
456f84b5c7 | ||
![]() |
daf4fe97dc | ||
![]() |
5b38daf74f | ||
![]() |
7a98f52542 | ||
![]() |
0a6a6bb66d | ||
![]() |
c9b73ee006 | ||
![]() |
b266a84324 | ||
![]() |
d591fa21b6 | ||
![]() |
8e314e9303 | ||
![]() |
d90d7a0af4 | ||
![]() |
9ec01c39de | ||
![]() |
bcd61321e6 | ||
![]() |
a015911e0e | ||
![]() |
648f2183c2 | ||
![]() |
860d749c6b | ||
![]() |
058e8d31b2 | ||
![]() |
8b8619dd8a | ||
![]() |
93be82f416 | ||
![]() |
4e6f8db7ed |
46
.github/workflows/build-openmct.yml
vendored
Normal file
46
.github/workflows/build-openmct.yml
vendored
Normal file
|
@ -0,0 +1,46 @@
|
|||
name: release
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
jobs:
|
||||
release-full:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 1.22
|
||||
- name: Set up Node
|
||||
uses: actions/setup-node@v4
|
||||
- name: Install OpenMCT
|
||||
run: npm ci
|
||||
working-directory: web/
|
||||
- name: Build OpenMCT bundle
|
||||
run: npm run build
|
||||
working-directory: web/
|
||||
- name: Build
|
||||
uses: crazy-max/ghaction-xgo@v3
|
||||
with:
|
||||
xgo_version: latest
|
||||
go_version: 1.21
|
||||
pkg: cmd/gotelem
|
||||
dest: build
|
||||
prefix: gotelem-full
|
||||
targets: windows/amd64,linux/amd64,linux/arm64,linux/arm/v7,darwin/arm64,darwin/amd64
|
||||
tags: openmct
|
||||
v: true
|
||||
x: false
|
||||
race: false
|
||||
ldflags: -s -w
|
||||
buildmode: default
|
||||
trimpath: true
|
||||
- name: Release binaries
|
||||
uses: https://gitea.com/actions/release-action@main
|
||||
with:
|
||||
files: |-
|
||||
build/**
|
||||
api_key: '${{secrets.RELEASE_TOKEN}}'
|
4
.github/workflows/go.yml
vendored
4
.github/workflows/go.yml
vendored
|
@ -3,10 +3,12 @@ name: Go
|
|||
on:
|
||||
push:
|
||||
branches: [ "master" ]
|
||||
paths:
|
||||
- "**.go"
|
||||
|
||||
jobs:
|
||||
|
||||
build:
|
||||
build-gotelem:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
|
|
25
.github/workflows/nodejs.yml
vendored
Normal file
25
.github/workflows/nodejs.yml
vendored
Normal file
|
@ -0,0 +1,25 @@
|
|||
name: Node.js CI
|
||||
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- "web/**"
|
||||
|
||||
jobs:
|
||||
build-openmct:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
defaults:
|
||||
run:
|
||||
working-directory: ./web/
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Use Node.js
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: '20.x'
|
||||
- run: npm ci
|
||||
- run: npm run build --if-present
|
||||
- run: npx eslint .
|
||||
|
6
.gitignore
vendored
6
.gitignore
vendored
|
@ -27,4 +27,8 @@ go.work
|
|||
/skylabify
|
||||
*.db
|
||||
*.db-journal
|
||||
/logs/
|
||||
/logs/
|
||||
|
||||
*.db-wal
|
||||
*.db-shm
|
||||
*.sqbpro
|
||||
|
|
19
broker.go
19
broker.go
|
@ -9,6 +9,8 @@ import (
|
|||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
// Broker is a Bus Event broadcast system. You can subscribe to events,
|
||||
// and send events.
|
||||
type Broker struct {
|
||||
subs map[string]chan skylab.BusEvent // contains the channel for each subsciber
|
||||
|
||||
|
@ -17,6 +19,7 @@ type Broker struct {
|
|||
bufsize int // size of chan buffer in elements.
|
||||
}
|
||||
|
||||
// NewBroker creates a new broker with a given logger.
|
||||
func NewBroker(bufsize int, logger *slog.Logger) *Broker {
|
||||
return &Broker{
|
||||
subs: make(map[string]chan skylab.BusEvent),
|
||||
|
@ -25,6 +28,7 @@ func NewBroker(bufsize int, logger *slog.Logger) *Broker {
|
|||
}
|
||||
}
|
||||
|
||||
// Subscribe joins the broker with the given name. The name must be unique.
|
||||
func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
|
||||
// get rw lock.
|
||||
b.lock.Lock()
|
||||
|
@ -33,23 +37,33 @@ func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
|
|||
if ok {
|
||||
return nil, errors.New("name already in use")
|
||||
}
|
||||
b.logger.Info("new subscriber", "name", name)
|
||||
b.logger.Info("subscribe", "name", name)
|
||||
ch = make(chan skylab.BusEvent, b.bufsize)
|
||||
|
||||
b.subs[name] = ch
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
// Unsubscribe removes a subscriber matching the name. It doesn't do anything
|
||||
// if there's nobody subscribed with that name
|
||||
func (b *Broker) Unsubscribe(name string) {
|
||||
// remove the channel from the map. We don't need to close it.
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
delete(b.subs, name)
|
||||
b.logger.Debug("unsubscribe", "name", name)
|
||||
if _, ok := b.subs[name]; ok {
|
||||
close(b.subs[name])
|
||||
delete(b.subs, name)
|
||||
}
|
||||
}
|
||||
|
||||
// Publish sends a bus event to all subscribers. It includes a sender
|
||||
// string which prevents loopback.
|
||||
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
||||
b.lock.RLock()
|
||||
defer b.lock.RUnlock()
|
||||
b.logger.Debug("publish", "sender", sender, "message", message)
|
||||
for name, ch := range b.subs {
|
||||
if name == sender {
|
||||
continue
|
||||
|
@ -57,7 +71,6 @@ func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
|||
// non blocking send.
|
||||
select {
|
||||
case ch <- message:
|
||||
b.logger.Debug("sent message", "dest", name, "src", sender)
|
||||
default:
|
||||
b.logger.Warn("recipient buffer full", "dest", name)
|
||||
}
|
||||
|
|
122
broker_test.go
Normal file
122
broker_test.go
Normal file
|
@ -0,0 +1,122 @@
|
|||
package gotelem
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
func makeEvent() skylab.BusEvent {
|
||||
var pkt skylab.Packet = &skylab.BmsMeasurement{
|
||||
BatteryVoltage: 12000,
|
||||
AuxVoltage: 24000,
|
||||
Current: 1.23,
|
||||
}
|
||||
return skylab.BusEvent{
|
||||
Timestamp: time.Now(),
|
||||
Name: pkt.String(),
|
||||
Data: pkt,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
func TestBroker(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("test send", func(t *testing.T) {
|
||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
||||
broker := NewBroker(10, flog)
|
||||
|
||||
sub, err := broker.Subscribe("testSub")
|
||||
if err != nil {
|
||||
t.Fatalf("error subscribing: %v", err)
|
||||
}
|
||||
testEvent := makeEvent()
|
||||
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond * 1)
|
||||
broker.Publish("other", testEvent)
|
||||
}()
|
||||
|
||||
var recvEvent skylab.BusEvent
|
||||
select {
|
||||
case recvEvent = <-sub:
|
||||
if !testEvent.Equals(&recvEvent) {
|
||||
t.Fatalf("events not equal, want %v got %v", testEvent, recvEvent)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("timeout waiting for packet")
|
||||
}
|
||||
|
||||
})
|
||||
t.Run("multiple broadcast", func(t *testing.T) {
|
||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
||||
broker := NewBroker(10, flog)
|
||||
testEvent := makeEvent()
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
clientFn := func(name string) {
|
||||
sub, err := broker.Subscribe(name)
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
return
|
||||
}
|
||||
<-sub
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
wg.Add(2)
|
||||
go clientFn("client1")
|
||||
go clientFn("client2")
|
||||
|
||||
// yes this is stupid. otherwise we race.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
broker.Publish("sender", testEvent)
|
||||
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
wg.Wait()
|
||||
done <- true
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("timeout waiting for clients")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("name collision", func(t *testing.T) {
|
||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
||||
broker := NewBroker(10, flog)
|
||||
_, err := broker.Subscribe("collide")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = broker.Subscribe("collide")
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("unsubscribe", func(t *testing.T) {
|
||||
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
|
||||
broker := NewBroker(10, flog)
|
||||
ch, err := broker.Subscribe("test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
broker.Unsubscribe("test")
|
||||
_, ok := <-ch
|
||||
if ok {
|
||||
t.Fatal("expected dead channel, but channel returned result")
|
||||
}
|
||||
})
|
||||
}
|
|
@ -9,7 +9,7 @@ import (
|
|||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/db"
|
||||
"github.com/kschamplin/gotelem"
|
||||
"github.com/kschamplin/gotelem/internal/logparsers"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
@ -81,7 +81,7 @@ func importAction(ctx *cli.Context) error {
|
|||
}
|
||||
|
||||
dbPath := ctx.Path("database")
|
||||
db, err := db.OpenTelemDb(dbPath)
|
||||
db, err := gotelem.OpenTelemDb(dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening database: %w", err)
|
||||
}
|
||||
|
|
|
@ -5,14 +5,14 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"math"
|
||||
"time"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/kschamplin/gotelem"
|
||||
"github.com/kschamplin/gotelem/internal/api"
|
||||
"github.com/kschamplin/gotelem/internal/db"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
"github.com/kschamplin/gotelem/xbee"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
@ -37,6 +37,10 @@ var serveFlags = []cli.Flag{
|
|||
DefaultText: "gotelem.db",
|
||||
Usage: "database to serve, if not specified will use memory",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "demo",
|
||||
Usage: "enable the demo packet stream",
|
||||
},
|
||||
}
|
||||
|
||||
var serveCmd = &cli.Command{
|
||||
|
@ -54,12 +58,11 @@ var serveCmd = &cli.Command{
|
|||
type service interface {
|
||||
fmt.Stringer
|
||||
Start(cCtx *cli.Context, deps svcDeps) (err error)
|
||||
Status()
|
||||
}
|
||||
|
||||
type svcDeps struct {
|
||||
Broker *gotelem.Broker
|
||||
Db *db.TelemDb
|
||||
Db *gotelem.TelemDb
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
|
@ -68,9 +71,8 @@ type svcDeps struct {
|
|||
// or if certain features are present (see cli/sqlite.go)
|
||||
var serveThings = []service{
|
||||
&xBeeService{},
|
||||
// &canLoggerService{},
|
||||
&dbWriterService{},
|
||||
&httpService{},
|
||||
&DemoService{},
|
||||
}
|
||||
|
||||
func serve(cCtx *cli.Context) error {
|
||||
|
@ -94,12 +96,12 @@ func serve(cCtx *cli.Context) error {
|
|||
broker := gotelem.NewBroker(20, logger.WithGroup("broker"))
|
||||
|
||||
// open database
|
||||
dbPath := "file::memory:?cache=shared"
|
||||
dbPath := "gotelem.db"
|
||||
if cCtx.IsSet("db") {
|
||||
dbPath = cCtx.Path("db")
|
||||
}
|
||||
logger.Info("opening database", "path", dbPath)
|
||||
db, err := db.OpenTelemDb(dbPath)
|
||||
db, err := gotelem.OpenTelemDb(dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -113,17 +115,17 @@ func serve(cCtx *cli.Context) error {
|
|||
}
|
||||
|
||||
for _, svc := range serveThings {
|
||||
logger.Info("starting service", "svc", svc.String())
|
||||
logger.Info("starting service", "service", svc.String())
|
||||
wg.Add(1)
|
||||
go func(mySvc service, baseLogger *slog.Logger) {
|
||||
svcLogger := logger.With("svc", mySvc.String())
|
||||
svcLogger := logger.With("service", mySvc.String())
|
||||
s := deps
|
||||
s.Logger = svcLogger
|
||||
defer wg.Done()
|
||||
// TODO: recover
|
||||
err := mySvc.Start(cCtx, s)
|
||||
if err != nil {
|
||||
logger.Error("service stopped!", "err", err, "svc", mySvc.String())
|
||||
logger.Error("service stopped!", "err", err, "service", mySvc.String())
|
||||
}
|
||||
}(svc, logger)
|
||||
}
|
||||
|
@ -148,6 +150,7 @@ func (x *xBeeService) Status() {
|
|||
func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||
logger := deps.Logger
|
||||
broker := deps.Broker
|
||||
tdb := deps.Db
|
||||
if cCtx.String("xbee") == "" {
|
||||
logger.Info("not using xbee")
|
||||
return
|
||||
|
@ -175,8 +178,6 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
xbeeTxer := json.NewEncoder(x.session)
|
||||
xbeeRxer := json.NewDecoder(x.session)
|
||||
|
||||
// xbeePackets := make(chan skylab.BusEvent)
|
||||
// background task to read json packets off of the xbee and send them to the
|
||||
go func() {
|
||||
for {
|
||||
var p skylab.BusEvent
|
||||
|
@ -185,6 +186,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
logger.Error("failed to decode xbee packet")
|
||||
}
|
||||
broker.Publish("xbee", p)
|
||||
tdb.AddEventsCtx(cCtx.Context, p)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
|
@ -194,7 +196,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
return
|
||||
case msg := <-rxCh:
|
||||
logger.Info("got msg", "msg", msg)
|
||||
xbeeTxer.Encode(msg)
|
||||
err := xbeeTxer.Encode(msg)
|
||||
if err != nil {
|
||||
logger.Warn("error writing to xbee", "err", err)
|
||||
}
|
||||
|
@ -220,7 +222,7 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
broker := deps.Broker
|
||||
db := deps.Db
|
||||
|
||||
r := api.TelemRouter(logger, broker, db)
|
||||
r := gotelem.TelemRouter(logger, broker, db)
|
||||
|
||||
//
|
||||
|
||||
|
@ -240,33 +242,58 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// dbWriterService listens to the CAN packet broker and saves packets to the database.
|
||||
type dbWriterService struct {
|
||||
|
||||
type DemoService struct {
|
||||
}
|
||||
|
||||
func (d *dbWriterService) Status() {
|
||||
|
||||
func (d *DemoService) String() string {
|
||||
return "demo service"
|
||||
}
|
||||
|
||||
func (d *dbWriterService) String() string {
|
||||
return "db logger"
|
||||
}
|
||||
|
||||
func (d *dbWriterService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||
|
||||
// put CAN packets from the broker into the database.
|
||||
tdb := deps.Db
|
||||
rxCh, err := deps.Broker.Subscribe("dbWriter")
|
||||
defer deps.Broker.Unsubscribe("dbWriter")
|
||||
|
||||
// TODO: add buffering + timeout/backpressure
|
||||
func (d *DemoService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
||||
if !cCtx.Bool("demo") {
|
||||
return
|
||||
}
|
||||
|
||||
broker := deps.Broker
|
||||
bmsPkt := &skylab.BmsMeasurement{
|
||||
Current: 1.23,
|
||||
BatteryVoltage: 11111,
|
||||
AuxVoltage: 22222,
|
||||
}
|
||||
wslPkt := &skylab.WslVelocity{
|
||||
MotorVelocity: 0,
|
||||
VehicleVelocity: 100.0,
|
||||
}
|
||||
var next skylab.Packet = bmsPkt
|
||||
for {
|
||||
select {
|
||||
case msg := <-rxCh:
|
||||
tdb.AddEventsCtx(cCtx.Context, msg)
|
||||
case <-cCtx.Done():
|
||||
return
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// send the next packet.
|
||||
if next == bmsPkt {
|
||||
bmsPkt.Current = float32(math.Sin(float64(time.Now().UnixMilli()) / 2000.0))
|
||||
ev := skylab.BusEvent{
|
||||
Timestamp: time.Now(),
|
||||
Name: next.String(),
|
||||
Data: next,
|
||||
}
|
||||
broker.Publish("livestream", ev)
|
||||
next = wslPkt
|
||||
} else {
|
||||
// send the wsl
|
||||
ev := skylab.BusEvent{
|
||||
Timestamp: time.Now(),
|
||||
Name: next.String(),
|
||||
Data: next,
|
||||
}
|
||||
broker.Publish("livestream", ev)
|
||||
next = bmsPkt
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
package cli
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/can"
|
||||
|
@ -55,9 +57,10 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
|
||||
logger := deps.Logger
|
||||
broker := deps.Broker
|
||||
tdb := deps.Db
|
||||
|
||||
if !cCtx.IsSet("can") {
|
||||
logger.Info("no can device provided")
|
||||
logger.Debug("no can device provided, skip")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -82,6 +85,9 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
go func() {
|
||||
for {
|
||||
pkt, err := s.sock.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.Warn("error receiving CAN packet", "err", err)
|
||||
}
|
||||
|
@ -95,23 +101,29 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
|
|||
case msg := <-rxCh:
|
||||
|
||||
frame, err = skylab.ToCanFrame(msg.Data)
|
||||
|
||||
if err != nil {
|
||||
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
s.sock.Send(&frame)
|
||||
|
||||
case msg := <-rxCan:
|
||||
p, err := skylab.FromCanFrame(msg)
|
||||
if err != nil {
|
||||
logger.Warn("error parsing can packet", "id", msg.Id)
|
||||
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
|
||||
continue
|
||||
}
|
||||
cde := skylab.BusEvent{
|
||||
event := skylab.BusEvent{
|
||||
Timestamp: time.Now(),
|
||||
Name: p.String(),
|
||||
Name: p.String(),
|
||||
Data: p,
|
||||
}
|
||||
broker.Publish("socketCAN", cde)
|
||||
broker.Publish("socketCAN", event)
|
||||
tdb.AddEventsCtx(cCtx.Context, event)
|
||||
case <-cCtx.Done():
|
||||
// close the socket.
|
||||
s.sock.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ func run(ctx *cli.Context) (err error) {
|
|||
|
||||
fileReader := bufio.NewReader(istream)
|
||||
|
||||
var pfun logparsers.BusParserFunc
|
||||
var pfun logparsers.BusEventParser
|
||||
|
||||
pfun, ok := logparsers.ParsersMap[ctx.String("format")]
|
||||
if !ok {
|
||||
|
|
388
db.go
Normal file
388
db.go
Normal file
|
@ -0,0 +1,388 @@
|
|||
package gotelem
|
||||
|
||||
// this file implements the database functions to load/store/read from a sql database.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
type TelemDb struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
|
||||
// this function is internal use. It actually opens the database, but uses
|
||||
// a raw path string instead of formatting one like the exported functions.
|
||||
func OpenRawDb(rawpath string) (tdb *TelemDb, err error) {
|
||||
tdb = &TelemDb{}
|
||||
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// perform any database migrations
|
||||
version, err := tdb.GetVersion()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// TODO: use logging instead of printf
|
||||
fmt.Printf("starting version %d\n", version)
|
||||
|
||||
version, err = RunMigrations(tdb)
|
||||
fmt.Printf("ending version %d\n", version)
|
||||
|
||||
return tdb, err
|
||||
}
|
||||
|
||||
// this string is used to open the read-write db.
|
||||
// the extra options improve performance significantly.
|
||||
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
|
||||
|
||||
// OpenTelemDb opens a new telemetry database at the given path.
|
||||
func OpenTelemDb(path string) (*TelemDb, error) {
|
||||
dbStr := fmt.Sprintf(ProductionDbURI, path)
|
||||
return OpenRawDb(dbStr)
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) GetVersion() (int, error) {
|
||||
var version int
|
||||
err := tdb.db.Get(&version, "PRAGMA user_version")
|
||||
return version, err
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) SetVersion(version int) error {
|
||||
stmt := fmt.Sprintf("PRAGMA user_version = %d", version)
|
||||
_, err := tdb.db.Exec(stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
// sql expression to insert a bus event into the packets database.1
|
||||
const sqlInsertEvent = `INSERT INTO "bus_events" (ts, name, data) VALUES `
|
||||
|
||||
// AddEvent adds the bus event to the database.
|
||||
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
|
||||
// edge case - zero events.
|
||||
if len(events) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
n = 0
|
||||
tx, err := tdb.db.BeginTx(ctx, nil)
|
||||
defer tx.Rollback()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sqlStmt := sqlInsertEvent
|
||||
const rowSql = "(?, ?, json(?))"
|
||||
inserts := make([]string, len(events))
|
||||
vals := []interface{}{}
|
||||
idx := 0 // we have to manually increment, because sometimes we don't insert.
|
||||
for _, b := range events {
|
||||
inserts[idx] = rowSql
|
||||
var j []byte
|
||||
j, err = json.Marshal(b.Data)
|
||||
|
||||
if err != nil {
|
||||
// we had some error turning the packet into json.
|
||||
continue // we silently skip.
|
||||
}
|
||||
|
||||
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
|
||||
idx++
|
||||
}
|
||||
|
||||
// construct the full statement now
|
||||
sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
|
||||
stmt, err := tx.PrepareContext(ctx, sqlStmt)
|
||||
// defer stmt.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res, err := stmt.ExecContext(ctx, vals...)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n, err = res.RowsAffected()
|
||||
|
||||
tx.Commit()
|
||||
return
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
|
||||
|
||||
return tdb.AddEventsCtx(context.Background(), events...)
|
||||
}
|
||||
|
||||
// LimitOffsetModifier is a modifier to support pagniation.
|
||||
type LimitOffsetModifier struct {
|
||||
Limit int
|
||||
Offset int
|
||||
}
|
||||
|
||||
func (l *LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
|
||||
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
|
||||
sb.WriteString(clause)
|
||||
return nil
|
||||
}
|
||||
|
||||
// BusEventFilter is a filter for bus events.
|
||||
type BusEventFilter struct {
|
||||
Names []string // The name(s) of packets to filter for
|
||||
StartTime time.Time // Starting time range. All packets >= StartTime
|
||||
EndTime time.Time // Ending time range. All packets <= EndTime
|
||||
Indexes []int // The specific index of the packets to index.
|
||||
}
|
||||
|
||||
// now we can optionally add a limit.
|
||||
|
||||
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *LimitOffsetModifier) ([]skylab.BusEvent, error) {
|
||||
// construct a simple
|
||||
var whereFrags = make([]string, 0)
|
||||
|
||||
// if we're filtering by names, add a where clause for it.
|
||||
if len(filter.Names) > 0 {
|
||||
// we have to quote our individual names
|
||||
names := strings.Join(filter.Names, `", "`)
|
||||
qString := fmt.Sprintf(`name IN ("%s")`, names)
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
// TODO: identify if we need a special case for both time ranges
|
||||
// using BETWEEN since apparenlty that can be better?
|
||||
|
||||
// next, check if we have a start/end time, add constraints
|
||||
if !filter.EndTime.IsZero() {
|
||||
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
if !filter.StartTime.IsZero() {
|
||||
// we have an end range
|
||||
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
if len(filter.Indexes) > 0 {
|
||||
s := make([]string, 0)
|
||||
for _, idx := range filter.Indexes {
|
||||
s = append(s, fmt.Sprint(idx))
|
||||
}
|
||||
idxs := strings.Join(s, ", ")
|
||||
qString := fmt.Sprintf(`idx in (%s)`, idxs)
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
|
||||
sb := strings.Builder{}
|
||||
sb.WriteString(`SELECT ts, name, data from "bus_events"`)
|
||||
// construct the full statement.
|
||||
if len(whereFrags) > 0 {
|
||||
// use the where clauses.
|
||||
sb.WriteString(" WHERE ")
|
||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||
}
|
||||
|
||||
sb.WriteString(" ORDER BY ts DESC")
|
||||
|
||||
// Augment our data further if there's i.e a limit modifier.
|
||||
// TODO: factor this out maybe?
|
||||
if lim != nil {
|
||||
lim.ModifyStatement(&sb)
|
||||
}
|
||||
rows, err := tdb.db.QueryxContext(ctx, sb.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var events = make([]skylab.BusEvent, 0, 10)
|
||||
|
||||
for rows.Next() {
|
||||
var ev skylab.RawJsonEvent
|
||||
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
BusEv := skylab.BusEvent{
|
||||
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
|
||||
Name: ev.Name,
|
||||
}
|
||||
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
|
||||
if err != nil {
|
||||
return events, nil
|
||||
}
|
||||
events = append(events, BusEv)
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
|
||||
return events, err
|
||||
}
|
||||
|
||||
// We now need a different use-case: we would like to extract a value from
|
||||
// a specific packet.
|
||||
|
||||
// Datum is a single measurement - it is more granular than a packet.
|
||||
// the classic example is bms_measurement.current
|
||||
type Datum struct {
|
||||
Timestamp time.Time `db:"timestamp" json:"ts"`
|
||||
Value any `db:"val" json:"val"`
|
||||
}
|
||||
|
||||
// GetValues queries the database for values in a given time range.
|
||||
// A value is a specific data point. For example, bms_measurement.current
|
||||
// would be a value.
|
||||
func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
|
||||
field string, lim *LimitOffsetModifier) ([]Datum, error) {
|
||||
// this fragment uses json_extract from sqlite to get a single
|
||||
// nested value.
|
||||
sb := strings.Builder{}
|
||||
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
|
||||
if len(filter.Names) != 1 {
|
||||
return nil, errors.New("invalid number of names")
|
||||
}
|
||||
whereFrags := []string{"name is ?"}
|
||||
|
||||
if !filter.StartTime.IsZero() {
|
||||
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
|
||||
if !filter.EndTime.IsZero() {
|
||||
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
if len(filter.Indexes) > 0 {
|
||||
s := make([]string, 0)
|
||||
for _, idx := range filter.Indexes {
|
||||
s = append(s, fmt.Sprint(idx))
|
||||
}
|
||||
idxs := strings.Join(s, ", ")
|
||||
qString := fmt.Sprintf(`idx in (%s)`, idxs)
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
// join qstrings with AND
|
||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||
|
||||
sb.WriteString(" ORDER BY ts DESC")
|
||||
|
||||
if lim != nil {
|
||||
lim.ModifyStatement(&sb)
|
||||
}
|
||||
|
||||
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, filter.Names[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
data := make([]Datum, 0, 10)
|
||||
for rows.Next() {
|
||||
var d Datum = Datum{}
|
||||
var ts int64
|
||||
err = rows.Scan(&ts, &d.Value)
|
||||
d.Timestamp = time.UnixMilli(ts)
|
||||
|
||||
if err != nil {
|
||||
fmt.Print(err)
|
||||
return data, err
|
||||
}
|
||||
data = append(data, d)
|
||||
}
|
||||
fmt.Print(rows.Err())
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// AddDocument inserts a new document to the store if it is unique and valid.
|
||||
func (tdb *TelemDb) AddDocument(ctx context.Context, obj json.RawMessage) error {
|
||||
const insertStmt = `INSERT INTO openmct_objects (data) VALUES (json(?))`
|
||||
_, err := tdb.db.ExecContext(ctx, insertStmt, obj)
|
||||
return err
|
||||
}
|
||||
|
||||
// DocumentNotFoundError is when the underlying document cannot be found.
|
||||
type DocumentNotFoundError string
|
||||
|
||||
func (e DocumentNotFoundError) Error() string {
|
||||
return fmt.Sprintf("document could not find key: %s", string(e))
|
||||
}
|
||||
|
||||
// UpdateDocument replaces the entire contents of a document matching
|
||||
// the given key. Note that the key is derived from the document,
|
||||
// and no checks are done to ensure that the new key is the same.
|
||||
func (tdb *TelemDb) UpdateDocument(ctx context.Context, key string,
|
||||
obj json.RawMessage) error {
|
||||
|
||||
const upd = `UPDATE openmct_objects SET data = json(?) WHERE key IS ?`
|
||||
r, err := tdb.db.ExecContext(ctx, upd, obj, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := r.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n != 1 {
|
||||
return DocumentNotFoundError(key)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetDocument gets the document matching the corresponding key.
|
||||
func (tdb *TelemDb) GetDocument(ctx context.Context, key string) (json.RawMessage, error) {
|
||||
const get = `SELECT data FROM openmct_objects WHERE key IS ?`
|
||||
|
||||
row := tdb.db.QueryRowxContext(ctx, get, key)
|
||||
|
||||
var res []byte // VERY important, json.RawMessage won't work here
|
||||
// since the scan function does not look at underlying types.
|
||||
row.Scan(&res)
|
||||
|
||||
if len(res) == 0 {
|
||||
return nil, DocumentNotFoundError(key)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// GetAllDocuments returns all documents in the database.
|
||||
func (tdb *TelemDb) GetAllDocuments(ctx context.Context) ([]json.RawMessage, error) {
|
||||
const getall = `SELECT data FROM openmct_objects`
|
||||
|
||||
rows, err := tdb.db.QueryxContext(ctx, getall)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
docs := make([]json.RawMessage, 0)
|
||||
for rows.Next() {
|
||||
var j json.RawMessage
|
||||
rows.Scan(&j)
|
||||
docs = append(docs, j)
|
||||
}
|
||||
return docs, nil
|
||||
}
|
||||
|
||||
// DeleteDocument removes a document from the store, or errors
|
||||
// if it does not exist.
|
||||
func (tdb *TelemDb) DeleteDocument(ctx context.Context, key string) error {
|
||||
const del = `DELETE FROM openmct_objects WHERE key IS ?`
|
||||
res, err := tdb.db.ExecContext(ctx, del, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n != 1 {
|
||||
return DocumentNotFoundError(key)
|
||||
}
|
||||
return err
|
||||
}
|
325
db_test.go
Normal file
325
db_test.go
Normal file
|
@ -0,0 +1,325 @@
|
|||
package gotelem
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/logparsers"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
// helper func to get a random bus event with random data.
|
||||
func GetRandomBusEvent() skylab.BusEvent {
|
||||
data := skylab.WsrVelocity{
|
||||
MotorVelocity: 1.0,
|
||||
VehicleVelocity: 4.0,
|
||||
}
|
||||
ev := skylab.BusEvent{
|
||||
Timestamp: time.Now(),
|
||||
Data: &data,
|
||||
}
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
// exampleData is a telemetry log data snippet that
|
||||
// we use to seed the database.
|
||||
const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD
|
||||
1698013005.168 1460000000000000000
|
||||
1698013005.170 1470000000000000000
|
||||
1698013005.172 1610000000000000000
|
||||
1698013005.175 1210000000000000000
|
||||
1698013005.177 157FFFFC74200000000
|
||||
1698013005.181 1030000000000000000
|
||||
1698013005.184 1430000000000000000
|
||||
1698013005.187 04020D281405EA8FB41
|
||||
1698013005.210 0413BDF81406AF70042
|
||||
1698013005.212 042569F81408EF0FF41
|
||||
1698013005.215 04358A8814041060242
|
||||
1698013005.219 04481958140D2A40342
|
||||
1698013005.221 0452DB2814042990442
|
||||
1698013005.224 047AF948140C031FD41
|
||||
1698013005.226 04B27A081401ACD0B42
|
||||
1698013005.229 04DCEAA81403C8C0A42
|
||||
1698013005.283 04E0378814024580142
|
||||
1698013005.286 04F97908140BFBC0142
|
||||
1698013005.289 050098A81402F0F0A42
|
||||
1698013005.293 051E6AE81402AF20842
|
||||
1698013005.297 0521AC081403A970742
|
||||
1698013005.300 0535BB181403CEB0542
|
||||
1698013005.304 054ECC0814088FE0142
|
||||
1698013005.307 0554ED181401F44F341
|
||||
1698013005.309 05726E48140D42BEB41
|
||||
1698013005.312 059EFC98140EC400142
|
||||
`
|
||||
|
||||
// MakeMockDatabase creates a new dummy database.
|
||||
func MakeMockDatabase(name string) *TelemDb {
|
||||
fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name)
|
||||
tdb, err := OpenRawDb(fstring)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return tdb
|
||||
}
|
||||
|
||||
func SeedMockDatabase(tdb *TelemDb) {
|
||||
// seed the database now.
|
||||
scanner := bufio.NewScanner(strings.NewReader(exampleData))
|
||||
|
||||
for scanner.Scan() {
|
||||
str := scanner.Text()
|
||||
|
||||
bev, err := logparsers.ParsersMap["telem"](str)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = tdb.AddEvents(bev)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetSeedEvents() []skylab.BusEvent {
|
||||
evs := make([]skylab.BusEvent, 0)
|
||||
scanner := bufio.NewScanner(strings.NewReader(exampleData))
|
||||
|
||||
for scanner.Scan() {
|
||||
str := scanner.Text()
|
||||
|
||||
bev, err := logparsers.ParsersMap["telem"](str)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
evs = append(evs, bev)
|
||||
}
|
||||
return evs
|
||||
}
|
||||
|
||||
func TestTelemDb(t *testing.T) {
|
||||
|
||||
t.Run("test opening database", func(t *testing.T) {
|
||||
// create our mock
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
})
|
||||
|
||||
t.Run("test inserting bus event", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
type args struct {
|
||||
events []skylab.BusEvent
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "add no packet",
|
||||
args: args{
|
||||
events: []skylab.BusEvent{},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add single packet",
|
||||
args: args{
|
||||
events: []skylab.BusEvent{GetRandomBusEvent()},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add multiple packet",
|
||||
args: args{
|
||||
events: []skylab.BusEvent{GetRandomBusEvent(), GetRandomBusEvent()},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if _, err := tdb.AddEvents(tt.args.events...); (err != nil) != tt.wantErr {
|
||||
t.Errorf("TelemDb.AddEvents() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test getting packets", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
SeedMockDatabase(tdb)
|
||||
|
||||
ctx := context.Background()
|
||||
f := BusEventFilter{}
|
||||
limitMod := &LimitOffsetModifier{Limit: 1}
|
||||
pkt, err := tdb.GetPackets(ctx, f, limitMod)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting packets: %v", err)
|
||||
}
|
||||
if len(pkt) != 1 {
|
||||
t.Fatalf("expected exactly one response, got %d", len(pkt))
|
||||
}
|
||||
// todo - validate what this should be.
|
||||
})
|
||||
|
||||
t.Run("test read-write packet", func(t *testing.T) {
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
func MockDocument(key string) json.RawMessage {
|
||||
var v = make(map[string]interface{})
|
||||
|
||||
v["identifier"] = map[string]string{"key": key}
|
||||
v["randomdata"] = rand.Int()
|
||||
res, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func TestDbDocuments(t *testing.T) {
|
||||
|
||||
t.Run("test inserting a document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
err := tdb.AddDocument(ctx, MockDocument("hi"))
|
||||
if err != nil {
|
||||
t.Fatalf("AddDocument expected no error, got err=%v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test inserting duplicate documents", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
doc := MockDocument("hi")
|
||||
err := tdb.AddDocument(ctx, doc)
|
||||
if err != nil {
|
||||
t.Fatalf("AddDocument expected no error, got err=%v", err)
|
||||
}
|
||||
|
||||
err = tdb.AddDocument(ctx, doc)
|
||||
if err == nil {
|
||||
t.Fatalf("AddDocument expected duplicate key error, got nil")
|
||||
}
|
||||
})
|
||||
t.Run("test inserting bad document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
var badDoc = map[string]string{"bad": "duh"}
|
||||
msg, err := json.Marshal(badDoc)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
err = tdb.AddDocument(ctx, msg)
|
||||
|
||||
if err == nil {
|
||||
t.Fatalf("AddDocument expected error, got nil")
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test getting document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
doc := MockDocument("hi")
|
||||
err := tdb.AddDocument(ctx, doc)
|
||||
if err != nil {
|
||||
t.Fatalf("AddDocument expected no error, got err=%v", err)
|
||||
}
|
||||
|
||||
res, err := tdb.GetDocument(ctx, "hi")
|
||||
if err != nil {
|
||||
t.Fatalf("GetDocument expected no error, got err=%v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(res, doc) {
|
||||
t.Fatalf("GetDocument did not return identical document")
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test getting nonexistent document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
|
||||
res, err := tdb.GetDocument(ctx, "hi")
|
||||
|
||||
if err == nil || !errors.Is(err, DocumentNotFoundError("hi")) {
|
||||
t.Fatalf("GetDocument expected DocumentNotFoundError, got %v", err)
|
||||
}
|
||||
if res != nil {
|
||||
t.Fatalf("GetDocument expected nil result, got %v", res)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test update document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
doc1 := MockDocument("hi")
|
||||
doc2 := MockDocument("hi") // same key, we want to update.
|
||||
|
||||
tdb.AddDocument(ctx, doc1)
|
||||
err := tdb.UpdateDocument(ctx, "hi", doc2)
|
||||
if err != nil {
|
||||
t.Fatalf("UpdateDocument expected no error, got err=%v", err)
|
||||
}
|
||||
|
||||
// compare.
|
||||
res, _ := tdb.GetDocument(ctx, "hi")
|
||||
if !reflect.DeepEqual(res, doc2) {
|
||||
t.Fatalf("UpdateDocument did not return new doc, got %s", res)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test update nonexistent document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
doc := MockDocument("hi")
|
||||
err := tdb.UpdateDocument(ctx, "badKey", doc)
|
||||
if err == nil {
|
||||
t.Fatalf("UpdateDocument expected error, got nil")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test delete document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
doc := MockDocument("hi")
|
||||
tdb.AddDocument(ctx, doc)
|
||||
err := tdb.DeleteDocument(ctx, "hi")
|
||||
if err != nil {
|
||||
t.Fatalf("DeleteDocument expected no error, got err=%v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("test delete nonexistent document", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
ctx := context.Background()
|
||||
err := tdb.DeleteDocument(ctx, "hi")
|
||||
if !errors.Is(err, DocumentNotFoundError("hi")) {
|
||||
t.Fatalf("DeleteDocument expected not found, got err=%v", err)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package api
|
||||
package gotelem
|
||||
|
||||
// this file defines the HTTP handlers and routes.
|
||||
|
||||
|
@ -6,26 +6,95 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"log/slog"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/google/uuid"
|
||||
"github.com/kschamplin/gotelem"
|
||||
"github.com/kschamplin/gotelem/internal/db"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
)
|
||||
|
||||
func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.Handler {
|
||||
func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
|
||||
|
||||
bef := &BusEventFilter{}
|
||||
|
||||
v := r.URL.Query()
|
||||
if v.Has("name") {
|
||||
bef.Names = v["name"]
|
||||
}
|
||||
|
||||
if el := v.Get("start"); el != "" {
|
||||
// parse the start time query.
|
||||
t, err := time.Parse(time.RFC3339, el)
|
||||
if err != nil {
|
||||
return bef, err
|
||||
}
|
||||
bef.StartTime = t
|
||||
}
|
||||
if el := v.Get("end"); el != "" {
|
||||
// parse the start time query.
|
||||
t, err := time.Parse(time.RFC3339, el)
|
||||
if err != nil {
|
||||
return bef, err
|
||||
}
|
||||
bef.EndTime = t
|
||||
}
|
||||
if v.Has("idx") {
|
||||
|
||||
bef.Indexes = make([]int, 0)
|
||||
for _, strIdx := range v["idx"] {
|
||||
idx, err := strconv.ParseInt(strIdx, 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bef.Indexes = append(bef.Indexes, int(idx))
|
||||
}
|
||||
}
|
||||
return bef, nil
|
||||
}
|
||||
|
||||
func extractLimitModifier(r *http.Request) (*LimitOffsetModifier, error) {
|
||||
lim := &LimitOffsetModifier{}
|
||||
v := r.URL.Query()
|
||||
if el := v.Get("limit"); el != "" {
|
||||
val, err := strconv.ParseInt(el, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lim.Limit = int(val)
|
||||
// next, we check if we have an offset.
|
||||
// we only check offset if we also have a limit.
|
||||
// offset without limit isn't valid and is ignored.
|
||||
if el := v.Get("offset"); el != "" {
|
||||
val, err := strconv.ParseInt(el, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lim.Offset = int(val)
|
||||
}
|
||||
return lim, nil
|
||||
}
|
||||
// we use the nil case to indicate that no limit was provided.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type RouterMod func(chi.Router)
|
||||
|
||||
var RouterMods = []RouterMod{}
|
||||
|
||||
func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler {
|
||||
r := chi.NewRouter()
|
||||
|
||||
r.Use(middleware.RequestID)
|
||||
r.Use(middleware.RealIP)
|
||||
r.Use(middleware.Logger) // TODO: integrate with slog instead of go default logger.
|
||||
r.Use(middleware.Recoverer)
|
||||
r.Use(middleware.SetHeader("Access-Control-Allow-Origin", "*"))
|
||||
|
||||
// heartbeat request.
|
||||
r.Get("/ping", func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -34,6 +103,9 @@ func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.
|
|||
|
||||
r.Mount("/api/v1", apiV1(broker, db))
|
||||
|
||||
for _, mod := range RouterMods {
|
||||
mod(r)
|
||||
}
|
||||
// To future residents - you can add new API calls/systems in /api/v2
|
||||
// Don't break anything in api v1! keep legacy code working!
|
||||
|
||||
|
@ -41,7 +113,7 @@ func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.
|
|||
}
|
||||
|
||||
// define API version 1 routes.
|
||||
func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
|
||||
func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
|
||||
r := chi.NewRouter()
|
||||
// this API only accepts JSON.
|
||||
r.Use(middleware.AllowContentType("application/json"))
|
||||
|
@ -56,15 +128,19 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
|
|||
})
|
||||
|
||||
r.Route("/packets", func(r chi.Router) {
|
||||
r.Get("/subscribe", apiV1PacketSubscribe(broker, tdb))
|
||||
r.Get("/subscribe", apiV1PacketSubscribe(broker))
|
||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
var pkgs []skylab.BusEvent
|
||||
var pkts []skylab.BusEvent
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
if err := decoder.Decode(&pkgs); err != nil {
|
||||
if err := decoder.Decode(&pkts); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
tdb.AddEvents(pkgs...)
|
||||
conn_id := r.RemoteAddr + uuid.NewString()
|
||||
for _, pkt := range pkts {
|
||||
broker.Publish(conn_id, pkt)
|
||||
}
|
||||
tdb.AddEventsCtx(r.Context(), pkts...)
|
||||
})
|
||||
// general packet history get.
|
||||
r.Get("/", apiV1GetPackets(tdb))
|
||||
|
@ -74,15 +150,10 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
|
|||
|
||||
})
|
||||
|
||||
// records are driving segments/runs.
|
||||
r.Route("/records", func(r chi.Router) {
|
||||
r.Get("/", apiV1GetRecords(tdb)) // get all runs
|
||||
r.Get("/active", apiV1GetActiveRecord(tdb)) // get current run (no end time)
|
||||
r.Post("/", apiV1StartRecord(tdb)) // create a new run (with note). Ends active run if any, and creates new active run (no end time)
|
||||
r.Get("/{id}", apiV1GetRecord(tdb)) // get details on a specific run
|
||||
r.Put("/{id}", apiV1UpdateRecord(tdb)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
|
||||
// OpenMCT domain object storage. Basically an arbitrary JSON document store
|
||||
r.Route("/openmct", apiV1OpenMCTStore(tdb))
|
||||
|
||||
})
|
||||
// records are driving segments/runs.
|
||||
|
||||
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
|
@ -91,9 +162,8 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
|
|||
return r
|
||||
}
|
||||
|
||||
|
||||
// this is a websocket stream.
|
||||
func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc {
|
||||
func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
// pull filter from url query params.
|
||||
bef, err := extractBusEventFilter(r)
|
||||
|
@ -101,7 +171,7 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
|
|||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
// setup connection
|
||||
conn_id := r.RemoteAddr + uuid.New().String()
|
||||
conn_id := r.RemoteAddr + uuid.NewString()
|
||||
sub, err := broker.Subscribe(conn_id)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
@ -111,19 +181,18 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
|
|||
defer broker.Unsubscribe(conn_id)
|
||||
|
||||
// setup websocket
|
||||
c, err := websocket.Accept(w, r, nil)
|
||||
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
c.Ping(r.Context())
|
||||
// closeread handles protocol/status messages,
|
||||
// also handles clients closing the connection.
|
||||
// we get a context to use from it.
|
||||
ctx := c.CloseRead(r.Context())
|
||||
|
||||
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -148,7 +217,7 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
|
|||
}
|
||||
}
|
||||
|
||||
func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
|
||||
func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
// this should use http query params to return a list of packets.
|
||||
bef, err := extractBusEventFilter(r)
|
||||
|
@ -163,21 +232,11 @@ func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
|
|||
return
|
||||
}
|
||||
|
||||
// TODO: is the following check needed?
|
||||
var res []skylab.BusEvent
|
||||
if lim != nil {
|
||||
res, err = tdb.GetPackets(r.Context(), *bef, lim)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
} else {
|
||||
res, err = tdb.GetPackets(r.Context(), *bef)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
res, err = tdb.GetPackets(r.Context(), *bef, lim)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
b, err := json.Marshal(res)
|
||||
if err != nil {
|
||||
|
@ -192,7 +251,7 @@ func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
|
|||
// apiV1GetValues is a function that creates a handler for
|
||||
// getting the specific value from a packet.
|
||||
// this is useful for OpenMCT or other viewer APIs
|
||||
func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
|
||||
func apiV1GetValues(db *TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var err error
|
||||
|
||||
|
@ -214,7 +273,9 @@ func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
|
|||
// override the bus event filter name option
|
||||
bef.Names = []string{name}
|
||||
|
||||
res, err := db.GetValues(r.Context(), *bef, field, lim)
|
||||
var res []Datum
|
||||
// make the call, skip the limit modifier if it's nil.
|
||||
res, err = db.GetValues(r.Context(), *bef, field, lim)
|
||||
if err != nil {
|
||||
// 500 server error:
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -230,27 +291,15 @@ func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
|
|||
|
||||
}
|
||||
|
||||
// TODO: rename. record is not a clear name. Runs? drives? segments?
|
||||
func apiV1GetRecords(db *db.TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
|
||||
return func(r chi.Router) {
|
||||
// key is a column on our json store, it's nested under identifier.key
|
||||
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
|
||||
// create a new object.
|
||||
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
|
||||
// subscribe to object updates.
|
||||
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
|
||||
}
|
||||
}
|
||||
|
||||
func apiV1GetActiveRecord(db *db.TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func apiV1StartRecord(db *db.TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {}
|
||||
}
|
||||
|
||||
func apiV1GetRecord(db *db.TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {}
|
||||
}
|
||||
|
||||
func apiV1UpdateRecord(db *db.TelemDb) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {}
|
||||
}
|
217
http_test.go
Normal file
217
http_test.go
Normal file
|
@ -0,0 +1,217 @@
|
|||
package gotelem
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
func Test_extractBusEventFilter(t *testing.T) {
|
||||
makeReq := func(path string) *http.Request {
|
||||
return httptest.NewRequest(http.MethodGet, path, nil)
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
req *http.Request
|
||||
want *BusEventFilter
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "test no extractions",
|
||||
req: makeReq("http://localhost/"),
|
||||
want: &BusEventFilter{},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test single name extract",
|
||||
req: makeReq("http://localhost/?name=hi"),
|
||||
want: &BusEventFilter{
|
||||
Names: []string{"hi"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test multi name extract",
|
||||
req: makeReq("http://localhost/?name=hi1&name=hi2"),
|
||||
want: &BusEventFilter{
|
||||
Names: []string{"hi1", "hi2"},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test start time valid extract",
|
||||
req: makeReq(fmt.Sprintf("http://localhost/?start=%s", url.QueryEscape(time.Unix(160000000, 0).Format(time.RFC3339)))),
|
||||
want: &BusEventFilter{
|
||||
StartTime: time.Unix(160000000, 0),
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
// {
|
||||
// name: "test start time invalid extract",
|
||||
// req: makeReq(fmt.Sprintf("http://localhost/?start=%s", url.QueryEscape("ajlaskdj"))),
|
||||
// wantErr: true,
|
||||
// },
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Logf("Testing URL %s", tt.req.URL.String())
|
||||
got, err := extractBusEventFilter(tt.req)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("extractBusEventFilter() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
// we have to manually compare fields because timestamps can't be deeply compared.
|
||||
if !reflect.DeepEqual(got.Names, tt.want.Names) {
|
||||
t.Errorf("extractBusEventFilter() Names bad = %v, want %v", got.Names, tt.want.Names)
|
||||
}
|
||||
if !reflect.DeepEqual(got.Indexes, tt.want.Indexes) {
|
||||
t.Errorf("extractBusEventFilter() Indexes bad = %v, want %v", got.Indexes, tt.want.Indexes)
|
||||
}
|
||||
if !got.StartTime.Equal(tt.want.StartTime) {
|
||||
t.Errorf("extractBusEventFilter() StartTime mismatch = %v, want %v", got.StartTime, tt.want.StartTime)
|
||||
}
|
||||
if !got.EndTime.Equal(tt.want.EndTime) {
|
||||
t.Errorf("extractBusEventFilter() EndTime mismatch = %v, want %v", got.EndTime, tt.want.EndTime)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_extractLimitModifier(t *testing.T) {
|
||||
makeReq := func(path string) *http.Request {
|
||||
return httptest.NewRequest(http.MethodGet, path, nil)
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
req *http.Request
|
||||
want *LimitOffsetModifier
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "test no limit/offset",
|
||||
req: makeReq("http://localhost/"),
|
||||
want: nil,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test limit, no offset",
|
||||
req: makeReq("http://localhost/?limit=10"),
|
||||
want: &LimitOffsetModifier{Limit: 10},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test limit and offset",
|
||||
req: makeReq("http://localhost/?limit=100&offset=200"),
|
||||
want: &LimitOffsetModifier{Limit: 100, Offset: 200},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test only offset",
|
||||
req: makeReq("http://localhost/?&offset=200"),
|
||||
want: nil,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "test bad limit",
|
||||
req: makeReq("http://localhost/?limit=aaaa"),
|
||||
want: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "test good limit, bad offset",
|
||||
req: makeReq("http://localhost/?limit=10&offset=jjjj"),
|
||||
want: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := extractLimitModifier(tt.req)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("extractLimitModifier() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("extractLimitModifier() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ApiV1GetPackets(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
SeedMockDatabase(tdb)
|
||||
evs := GetSeedEvents()
|
||||
handler := apiV1GetPackets(tdb)
|
||||
|
||||
tests := []struct{
|
||||
name string
|
||||
req *http.Request
|
||||
statusCode int
|
||||
expectedResults []skylab.BusEvent
|
||||
}{
|
||||
{
|
||||
name: "get all packets test",
|
||||
req: httptest.NewRequest(http.MethodGet, "http://localhost/", nil),
|
||||
statusCode: http.StatusOK,
|
||||
expectedResults: evs,
|
||||
},
|
||||
{
|
||||
name: "filter name test",
|
||||
req: httptest.NewRequest(http.MethodGet, "http://localhost/?name=bms_module", nil),
|
||||
statusCode: http.StatusOK,
|
||||
expectedResults: func() []skylab.BusEvent {
|
||||
filtered := make([]skylab.BusEvent, 0)
|
||||
for _, pkt := range evs {
|
||||
if pkt.Name == "bms_module" {
|
||||
filtered = append(filtered, pkt)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}(),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// construct the recorder
|
||||
w := httptest.NewRecorder()
|
||||
handler(w, tt.req)
|
||||
|
||||
resp := w.Result()
|
||||
|
||||
if tt.statusCode != resp.StatusCode {
|
||||
t.Errorf("incorrect status code: expected %d got %d", tt.statusCode, resp.StatusCode)
|
||||
}
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
var resultEvents []skylab.BusEvent
|
||||
err := decoder.Decode(&resultEvents)
|
||||
if err != nil {
|
||||
t.Fatalf("could not parse JSON response: %v", err)
|
||||
}
|
||||
|
||||
if len(resultEvents) != len(tt.expectedResults) {
|
||||
t.Fatalf("response length did not match, want %d got %d", len(tt.expectedResults), len(resultEvents))
|
||||
}
|
||||
|
||||
// Note, the results are flipped here. We return earliest first.
|
||||
for idx := range tt.expectedResults {
|
||||
expected := tt.expectedResults[idx]
|
||||
actual := resultEvents[len(resultEvents) - 1 - idx]
|
||||
if !expected.Equals(&actual) {
|
||||
t.Errorf("packet did not match, want %v got %v", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
package api
|
||||
// This file contains common behaviors that are used across various requests
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/db"
|
||||
)
|
||||
|
||||
func extractBusEventFilter(r *http.Request) (*db.BusEventFilter, error) {
|
||||
|
||||
bef := &db.BusEventFilter{}
|
||||
|
||||
v := r.URL.Query()
|
||||
bef.Names = v["name"] // put all the names in.
|
||||
if el := v.Get("start"); el != "" {
|
||||
// parse the start time query.
|
||||
t, err := time.Parse(time.RFC3339, el)
|
||||
if err != nil {
|
||||
return bef, err
|
||||
}
|
||||
bef.TimerangeStart = t
|
||||
}
|
||||
if el := v.Get("end"); el != "" {
|
||||
// parse the start time query.
|
||||
t, err := time.Parse(time.RFC3339, el)
|
||||
if err != nil {
|
||||
return bef, err
|
||||
}
|
||||
bef.TimerangeStart = t
|
||||
}
|
||||
return bef, nil
|
||||
}
|
||||
|
||||
func extractLimitModifier(r *http.Request) (*db.LimitOffsetModifier, error) {
|
||||
lim := &db.LimitOffsetModifier{}
|
||||
v := r.URL.Query()
|
||||
if el := v.Get("limit"); el != "" {
|
||||
val, err := strconv.ParseInt(el, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lim.Limit = int(val)
|
||||
// next, we check if we have an offset.
|
||||
// we only check offset if we also have a limit.
|
||||
// offset without limit isn't valid and is ignored.
|
||||
if el := v.Get("offset"); el != "" {
|
||||
val, err := strconv.ParseInt(el, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lim.Offset = int(val)
|
||||
}
|
||||
return lim, nil
|
||||
}
|
||||
// we use the nil case to indicate that no limit was provided.
|
||||
return nil, nil
|
||||
}
|
|
@ -6,23 +6,22 @@
|
|||
// by writing "adapters" to various devices/formats (xbee, socketcan)
|
||||
package can
|
||||
|
||||
|
||||
type CanID struct {
|
||||
Id uint32
|
||||
Id uint32
|
||||
Extended bool // since the id itself is not enough.
|
||||
}
|
||||
|
||||
// Frame represents a protocol-agnostic CAN frame. The Id can be standard or extended,
|
||||
// but if it is extended, the Kind should be EFF.
|
||||
type Frame struct {
|
||||
Id CanID
|
||||
Id CanID
|
||||
Data []byte
|
||||
Kind Kind
|
||||
}
|
||||
|
||||
|
||||
// TODO: should this be replaced
|
||||
type CANFrame interface {
|
||||
Id()
|
||||
Id()
|
||||
Data() []byte
|
||||
Type() Kind
|
||||
}
|
||||
|
@ -34,8 +33,8 @@ type Kind uint8
|
|||
|
||||
const (
|
||||
CanDataFrame Kind = iota // Standard ID Frame
|
||||
CanRTRFrame // Remote Transmission Request Frame
|
||||
CanErrFrame // Error Frame
|
||||
CanRTRFrame // Remote Transmission Request Frame
|
||||
CanErrFrame // Error Frame
|
||||
)
|
||||
|
||||
// CanFilter is a basic filter for masking out data. It has an Inverted flag
|
||||
|
|
|
@ -1,152 +0,0 @@
|
|||
package db
|
||||
|
||||
// this file implements the database functions to load/store/read from a sql database.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
|
||||
type TelemDb struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
// TelemDbOption lets you customize the behavior of the sqlite database
|
||||
type TelemDbOption func(*TelemDb) error
|
||||
|
||||
|
||||
// this function is internal use. It actually opens the database, but uses
|
||||
// a raw path string instead of formatting one like the exported functions.
|
||||
func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) {
|
||||
tdb = &TelemDb{}
|
||||
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, fn := range options {
|
||||
err = fn(tdb)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// perform any database migrations
|
||||
version, err := tdb.GetVersion()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// TODO: use logging instead of printf
|
||||
fmt.Printf("starting version %d\n", version)
|
||||
|
||||
version, err = RunMigrations(tdb)
|
||||
fmt.Printf("ending version %d\n", version)
|
||||
|
||||
return tdb, err
|
||||
}
|
||||
|
||||
|
||||
// this string is used to open the read-write db.
|
||||
// the extra options improve performance significantly.
|
||||
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
|
||||
|
||||
// OpenTelemDb opens a new telemetry database at the given path.
|
||||
func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) {
|
||||
dbStr := fmt.Sprintf(ProductionDbURI, path)
|
||||
return OpenRawDb(dbStr, options...)
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) GetVersion() (int, error) {
|
||||
var version int
|
||||
err := tdb.db.Get(&version, "PRAGMA user_version")
|
||||
return version, err
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) SetVersion(version int) error {
|
||||
stmt := fmt.Sprintf("PRAGMA user_version = %d", version)
|
||||
_, err := tdb.db.Exec(stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
// sql expression to insert a bus event into the packets database.1
|
||||
const sqlInsertEvent =`INSERT INTO "bus_events" (ts, name, data) VALUES `
|
||||
|
||||
// AddEvent adds the bus event to the database.
|
||||
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
|
||||
// edge case - zero events.
|
||||
if len(events) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
n = 0
|
||||
tx, err := tdb.db.BeginTx(ctx, nil)
|
||||
defer tx.Rollback()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sqlStmt := sqlInsertEvent
|
||||
const rowSql = "(?, ?, json(?))"
|
||||
inserts := make([]string, len(events))
|
||||
vals := []interface{}{}
|
||||
idx := 0 // we have to manually increment, because sometimes we don't insert.
|
||||
for _, b := range events {
|
||||
inserts[idx] = rowSql
|
||||
var j []byte
|
||||
j, err = json.Marshal(b.Data)
|
||||
|
||||
if err != nil {
|
||||
// we had some error turning the packet into json.
|
||||
continue // we silently skip.
|
||||
}
|
||||
|
||||
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
|
||||
idx++
|
||||
}
|
||||
|
||||
// construct the full statement now
|
||||
sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
|
||||
stmt, err := tx.PrepareContext(ctx, sqlStmt)
|
||||
// defer stmt.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res, err := stmt.ExecContext(ctx, vals...)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n, err = res.RowsAffected()
|
||||
|
||||
tx.Commit()
|
||||
return
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
|
||||
|
||||
return tdb.AddEventsCtx(context.Background(), events...)
|
||||
}
|
||||
|
||||
|
||||
// GetActiveDrive finds the non-null drive and returns it, if any.
|
||||
func (tdb *TelemDb) GetActiveDrive() (res int, err error) {
|
||||
err = tdb.db.Get(&res, "SELECT id FROM drive_records WHERE end_time IS NULL LIMIT 1")
|
||||
return
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) NewDrive(start time.Time, note string) {
|
||||
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) EndDrive() {
|
||||
|
||||
}
|
||||
|
||||
func (tdb *TelemDb) UpdateDrive(id int, note string) {
|
||||
|
||||
}
|
|
@ -1,156 +0,0 @@
|
|||
package db
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/logparsers"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
// helper func to get a random bus event with random data.
|
||||
func GetRandomBusEvent() skylab.BusEvent {
|
||||
data := skylab.WsrVelocity{
|
||||
MotorVelocity: 1.0,
|
||||
VehicleVelocity: 4.0,
|
||||
}
|
||||
ev := skylab.BusEvent{
|
||||
Timestamp: time.Now(),
|
||||
Data: &data,
|
||||
}
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
// exampleData is a telemetry log data snippet that
|
||||
// we use to seed the database.
|
||||
const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD
|
||||
1698013005.168 1460000000000000000
|
||||
1698013005.170 1470000000000000000
|
||||
1698013005.172 1610000000000000000
|
||||
1698013005.175 1210000000000000000
|
||||
1698013005.177 157FFFFC74200000000
|
||||
1698013005.181 1030000000000000000
|
||||
1698013005.184 1430000000000000000
|
||||
1698013005.187 04020D281405EA8FB41
|
||||
1698013005.210 0413BDF81406AF70042
|
||||
1698013005.212 042569F81408EF0FF41
|
||||
1698013005.215 04358A8814041060242
|
||||
1698013005.219 04481958140D2A40342
|
||||
1698013005.221 0452DB2814042990442
|
||||
1698013005.224 047AF948140C031FD41
|
||||
1698013005.226 04B27A081401ACD0B42
|
||||
1698013005.229 04DCEAA81403C8C0A42
|
||||
1698013005.283 04E0378814024580142
|
||||
1698013005.286 04F97908140BFBC0142
|
||||
1698013005.289 050098A81402F0F0A42
|
||||
1698013005.293 051E6AE81402AF20842
|
||||
1698013005.297 0521AC081403A970742
|
||||
1698013005.300 0535BB181403CEB0542
|
||||
1698013005.304 054ECC0814088FE0142
|
||||
1698013005.307 0554ED181401F44F341
|
||||
1698013005.309 05726E48140D42BEB41
|
||||
1698013005.312 059EFC98140EC400142
|
||||
`
|
||||
|
||||
// MakeMockDatabase creates a new dummy database.
|
||||
func MakeMockDatabase(name string) *TelemDb {
|
||||
fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name)
|
||||
tdb, err := OpenRawDb(fstring)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// seed the database now.
|
||||
scanner := bufio.NewScanner(strings.NewReader(exampleData))
|
||||
|
||||
for scanner.Scan() {
|
||||
str := scanner.Text()
|
||||
|
||||
bev, err := logparsers.ParsersMap["telem"](str)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = tdb.AddEvents(bev)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return tdb
|
||||
}
|
||||
|
||||
func TestTelemDb(t *testing.T) {
|
||||
|
||||
|
||||
t.Run("test opening database", func(t *testing.T) {
|
||||
// create our mock
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
tdb.db.Ping()
|
||||
})
|
||||
|
||||
t.Run("test inserting bus event", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
type args struct {
|
||||
events []skylab.BusEvent
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "add no packet",
|
||||
args: args{
|
||||
events: []skylab.BusEvent{},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add single packet",
|
||||
args: args{
|
||||
events: []skylab.BusEvent{GetRandomBusEvent()},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "add multiple packet",
|
||||
args: args{
|
||||
events: []skylab.BusEvent{GetRandomBusEvent(), GetRandomBusEvent()},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if _, err := tdb.AddEvents(tt.args.events...); (err != nil) != tt.wantErr {
|
||||
t.Errorf("TelemDb.AddEvents() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("test getting packets", func(t *testing.T) {
|
||||
tdb := MakeMockDatabase(t.Name())
|
||||
|
||||
ctx := context.Background()
|
||||
f := BusEventFilter{}
|
||||
limitMod := LimitOffsetModifier{Limit: 1}
|
||||
pkt, err := tdb.GetPackets(ctx, f, limitMod)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting packets: %v", err)
|
||||
}
|
||||
if len(pkt) != 1 {
|
||||
t.Fatalf("expected exactly one response, got %d", len(pkt))
|
||||
}
|
||||
// todo - validate what this should be.
|
||||
})
|
||||
|
||||
t.Run("test read-write packet", func(t *testing.T) {
|
||||
|
||||
})
|
||||
}
|
|
@ -1,172 +0,0 @@
|
|||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
// Modifier augments SQL strings.
|
||||
type Modifier interface {
|
||||
ModifyStatement(*strings.Builder) error
|
||||
}
|
||||
|
||||
// LimitOffsetModifier is a modifier to support pagniation.
|
||||
type LimitOffsetModifier struct {
|
||||
Limit int
|
||||
Offset int
|
||||
}
|
||||
|
||||
func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
|
||||
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
|
||||
sb.WriteString(clause)
|
||||
return nil
|
||||
}
|
||||
|
||||
// BusEventFilter is a filter for bus events.
|
||||
type BusEventFilter struct {
|
||||
Names []string
|
||||
TimerangeStart time.Time
|
||||
TimerangeEnd time.Time
|
||||
}
|
||||
|
||||
// now we can optionally add a limit.
|
||||
|
||||
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...Modifier) ([]skylab.BusEvent, error) {
|
||||
// construct a simple
|
||||
var whereFrags = make([]string, 0)
|
||||
|
||||
// if we're filtering by names, add a where clause for it.
|
||||
if len(filter.Names) > 0 {
|
||||
names := strings.Join(filter.Names, ", ")
|
||||
qString := fmt.Sprintf("name IN (%s)", names)
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
// TODO: identify if we need a special case for both time ranges
|
||||
// using BETWEEN since apparenlty that can be better?
|
||||
|
||||
// next, check if we have a start/end time, add constraints
|
||||
if !filter.TimerangeEnd.IsZero() {
|
||||
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
if !filter.TimerangeStart.IsZero() {
|
||||
// we have an end range
|
||||
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
|
||||
whereFrags = append(whereFrags, qString)
|
||||
}
|
||||
|
||||
sb := strings.Builder{}
|
||||
sb.WriteString("SELECT * from \"bus_events\"")
|
||||
// construct the full statement.
|
||||
if len(whereFrags) > 0 {
|
||||
// use the where clauses.
|
||||
sb.WriteString(" WHERE ")
|
||||
sb.WriteString(strings.Join(whereFrags, " AND "))
|
||||
}
|
||||
|
||||
// Augment our data further if there's i.e a limit modifier.
|
||||
// TODO: factor this out maybe?
|
||||
for _, m := range options {
|
||||
m.ModifyStatement(&sb)
|
||||
}
|
||||
rows, err := tdb.db.QueryxContext(ctx, sb.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var events = make([]skylab.BusEvent, 0, 10)
|
||||
|
||||
for rows.Next() {
|
||||
var ev skylab.RawJsonEvent
|
||||
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
BusEv := skylab.BusEvent{
|
||||
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
|
||||
Name: ev.Name,
|
||||
}
|
||||
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
|
||||
if err != nil {
|
||||
return events, nil
|
||||
}
|
||||
events = append(events, BusEv)
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
|
||||
return events, err
|
||||
}
|
||||
|
||||
// We now need a different use-case: we would like to extract a value from
|
||||
// a specific packet.
|
||||
|
||||
// Datum is a single measurement - it is more granular than a packet.
|
||||
// the classic example is bms_measurement.current
|
||||
type Datum struct {
|
||||
Timestamp time.Time `db:"timestamp"`
|
||||
Value any `db:"val"`
|
||||
}
|
||||
|
||||
// GetValues queries the database for values in a given time range.
|
||||
// A value is a specific data point. For example, bms_measurement.current
|
||||
// would be a value.
|
||||
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
|
||||
field string, opts ...Modifier) ([]Datum, error) {
|
||||
// this fragment uses json_extract from sqlite to get a single
|
||||
// nested value.
|
||||
sb := strings.Builder{}
|
||||
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
|
||||
if len(bef.Names) != 1 {
|
||||
return nil, errors.New("invalid number of names")
|
||||
}
|
||||
|
||||
qStrings := []string{"name is ?"}
|
||||
// add timestamp limit.
|
||||
if !bef.TimerangeStart.IsZero() {
|
||||
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
|
||||
qStrings = append(qStrings, qString)
|
||||
}
|
||||
|
||||
if !bef.TimerangeEnd.IsZero() {
|
||||
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli())
|
||||
qStrings = append(qStrings, qString)
|
||||
}
|
||||
// join qstrings with AND
|
||||
sb.WriteString(strings.Join(qStrings, " AND "))
|
||||
|
||||
for _, m := range opts {
|
||||
if m == nil {
|
||||
continue
|
||||
}
|
||||
m.ModifyStatement(&sb)
|
||||
}
|
||||
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
data := make([]Datum, 0, 10)
|
||||
for rows.Next() {
|
||||
var d Datum = Datum{}
|
||||
var ts int64
|
||||
err = rows.Scan(&ts, &d.Value)
|
||||
d.Timestamp = time.UnixMilli(ts)
|
||||
|
||||
if err != nil {
|
||||
fmt.Print(err)
|
||||
return data, err
|
||||
}
|
||||
data = append(data, d)
|
||||
}
|
||||
fmt.Print(rows.Err())
|
||||
|
||||
return data, nil
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
package db
|
||||
// This file implements Packet modelling, which allows us to look up fields by name
|
||||
|
||||
type PacketDef struct {
|
||||
Name string
|
||||
Description string
|
||||
Id int
|
||||
}
|
||||
|
||||
type FieldDef struct {
|
||||
Name string
|
||||
SubName string
|
||||
Packet string
|
||||
Type string
|
||||
}
|
||||
|
||||
// PacketNotFoundError is when a matching packet cannot be found.
|
||||
type PacketNotFoundError string
|
||||
|
||||
func (e *PacketNotFoundError) Error() string {
|
||||
return "packet not found: " + string(*e)
|
||||
}
|
||||
|
||||
|
||||
// GetPacketDefN retrieves a packet matching the given name, if it exists.
|
||||
// returns PacketNotFoundError if a matching packet could not be found.
|
||||
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetPacketDefF retrieves the parent packet for a given field.
|
||||
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing.
|
||||
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
// GetFieldDefs returns the given fields for a given packet definition.
|
||||
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
@ -2,6 +2,7 @@ package logparsers
|
|||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
@ -35,16 +36,16 @@ func NewFormatError(msg string, err error) error {
|
|||
return &FormatError{msg: msg, err: err}
|
||||
}
|
||||
|
||||
// type LineParserFunc is a function that takes a string
|
||||
// type CanFrameParser is a function that takes a string
|
||||
// and returns a can frame. This is useful for common
|
||||
// can dump formats.
|
||||
type LineParserFunc func(string) (can.Frame, time.Time, error)
|
||||
type CanFrameParser func(string) (can.Frame, time.Time, error)
|
||||
|
||||
var candumpRegex = regexp.MustCompile(`^\((\d+)\.(\d{6})\) \w+ (\w+)#(\w+)$`)
|
||||
|
||||
func parseCanDumpLine(dumpLine string) (frame can.Frame, ts time.Time, err error) {
|
||||
frame = can.Frame{}
|
||||
ts = time.Unix(0,0)
|
||||
ts = time.Unix(0, 0)
|
||||
// dumpline looks like this:
|
||||
// (1684538768.521889) can0 200#8D643546
|
||||
// remove trailing newline/whitespaces
|
||||
|
@ -84,13 +85,13 @@ func parseCanDumpLine(dumpLine string) (frame can.Frame, ts time.Time, err error
|
|||
}
|
||||
|
||||
// TODO: add extended id support, need an example log and a test.
|
||||
frame.Id = can.CanID{Id: uint32(id), Extended: false}
|
||||
frame.Id = can.CanID{Id: uint32(id), Extended: false}
|
||||
frame.Data = rawData
|
||||
frame.Kind = can.CanDataFrame
|
||||
|
||||
ts = time.Unix(unixSeconds, unixMicros * int64(time.Microsecond))
|
||||
ts = time.Unix(unixSeconds, unixMicros*int64(time.Microsecond))
|
||||
|
||||
return
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
|
@ -103,7 +104,7 @@ var telemRegex = regexp.MustCompile(`^(\d+)\.(\d{3}) (\w{3})(\w+)$`)
|
|||
|
||||
func parseTelemLogLine(line string) (frame can.Frame, ts time.Time, err error) {
|
||||
frame = can.Frame{}
|
||||
ts = time.Unix(0,0)
|
||||
ts = time.Unix(0, 0)
|
||||
// strip trailng newline since we rely on it being gone
|
||||
line = strings.TrimSpace(line)
|
||||
|
||||
|
@ -157,20 +158,27 @@ func parseTelemLogLine(line string) (frame can.Frame, ts time.Time, err error) {
|
|||
|
||||
}
|
||||
|
||||
// BusParserFunc is a function that takes a string and returns a busevent.
|
||||
type BusParserFunc func(string) (skylab.BusEvent, error)
|
||||
// BusEventParser is a function that takes a string and returns a busevent.
|
||||
type BusEventParser func(string) (skylab.BusEvent, error)
|
||||
|
||||
// parserBusEventMapper takes a line parser (that returns a can frame)
|
||||
// skylabify JSON parser.
|
||||
func parseSkylabifyLogLine(input string) (skylab.BusEvent, error) {
|
||||
var b = skylab.BusEvent{}
|
||||
err := json.Unmarshal([]byte(input), &b)
|
||||
return b, err
|
||||
}
|
||||
|
||||
// frameParseToBusEvent takes a line parser (that returns a can frame)
|
||||
// and makes it return a busEvent instead.
|
||||
func parserBusEventMapper(f LineParserFunc) BusParserFunc {
|
||||
func frameParseToBusEvent(fun CanFrameParser) BusEventParser {
|
||||
return func(s string) (skylab.BusEvent, error) {
|
||||
var b = skylab.BusEvent{}
|
||||
f, ts, err := f(s)
|
||||
frame, ts, err := fun(s)
|
||||
if err != nil {
|
||||
return b, err
|
||||
}
|
||||
b.Timestamp = ts
|
||||
b.Data, err = skylab.FromCanFrame(f)
|
||||
b.Data, err = skylab.FromCanFrame(frame)
|
||||
if err != nil {
|
||||
return b, err
|
||||
}
|
||||
|
@ -179,7 +187,8 @@ func parserBusEventMapper(f LineParserFunc) BusParserFunc {
|
|||
}
|
||||
}
|
||||
|
||||
var ParsersMap = map[string]BusParserFunc{
|
||||
"telem": parserBusEventMapper(parseTelemLogLine),
|
||||
"candump": parserBusEventMapper(parseCanDumpLine),
|
||||
var ParsersMap = map[string]BusEventParser{
|
||||
"telem": frameParseToBusEvent(parseTelemLogLine),
|
||||
"candump": frameParseToBusEvent(parseCanDumpLine),
|
||||
"json": parseSkylabifyLogLine,
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/kschamplin/gotelem/internal/can"
|
||||
"github.com/kschamplin/gotelem/skylab"
|
||||
)
|
||||
|
||||
func Test_parseCanDumpLine(t *testing.T) {
|
||||
|
@ -159,7 +160,6 @@ func Test_parseTelemLogLine_errors(t *testing.T) {
|
|||
name: "utf8 corruption",
|
||||
input: "1698180835.318 0619\xed\xa0\x80fsadfD805640X0EBE24",
|
||||
},
|
||||
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
@ -171,3 +171,42 @@ func Test_parseTelemLogLine_errors(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_parseSkylabifyLogLine(t *testing.T) {
|
||||
type args struct {
|
||||
input string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want skylab.BusEvent
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "basic test",
|
||||
args: args{
|
||||
input: `{"ts":1685141873612,"id":259,"name":"wsl_velocity","data":{"motor_velocity":89.97547,"vehicle_velocity":2.38853}}`},
|
||||
want: skylab.BusEvent{
|
||||
Timestamp: time.UnixMilli(1685141873612),
|
||||
Name: "wsl_velocity",
|
||||
Data: &skylab.WslVelocity{
|
||||
MotorVelocity: 89.97547,
|
||||
VehicleVelocity: 2.38853,
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := parseSkylabifyLogLine(tt.args.input)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("parseSkylabifyLogLine() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("parseSkylabifyLogLine() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package db
|
||||
package gotelem
|
||||
|
||||
import (
|
||||
"embed"
|
|
@ -1,4 +1,4 @@
|
|||
package db
|
||||
package gotelem
|
||||
|
||||
import (
|
||||
"embed"
|
|
@ -1,2 +1 @@
|
|||
DROP TABLE "drive_records";
|
||||
DROP TABLE "position_logs";
|
|
@ -7,12 +7,3 @@ CREATE TABLE "drive_records" (
|
|||
PRIMARY KEY("id" AUTOINCREMENT),
|
||||
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
|
||||
);
|
||||
|
||||
CREATE TABLE "position_logs" (
|
||||
"ts" INTEGER NOT NULL,
|
||||
"source" TEXT NOT NULL,
|
||||
"lat" REAL NOT NULL,
|
||||
"lon" REAL NOT NULL,
|
||||
"elevation" REAL,
|
||||
CONSTRAINT "no_empty_source" CHECK(source is not "")
|
||||
);
|
1
migrations/6_bus_index_column_down.sql
Normal file
1
migrations/6_bus_index_column_down.sql
Normal file
|
@ -0,0 +1 @@
|
|||
ALTER TABLE "bus_events" DROP COLUMN idx;
|
1
migrations/6_bus_index_column_up.sql
Normal file
1
migrations/6_bus_index_column_up.sql
Normal file
|
@ -0,0 +1 @@
|
|||
ALTER TABLE "bus_events" ADD COLUMN idx GENERATED ALWAYS AS (json_extract(data, '$.idx')) VIRTUAL;
|
2
migrations/7_domainobject_down.sql
Normal file
2
migrations/7_domainobject_down.sql
Normal file
|
@ -0,0 +1,2 @@
|
|||
DROP TABLE openmct_objects;
|
||||
DROP INDEX openmct_key;
|
6
migrations/7_domainobject_up.sql
Normal file
6
migrations/7_domainobject_up.sql
Normal file
|
@ -0,0 +1,6 @@
|
|||
CREATE TABLE openmct_objects (
|
||||
data TEXT,
|
||||
key TEXT GENERATED ALWAYS AS (json_extract(data, '$.identifier.key')) VIRTUAL UNIQUE NOT NULL
|
||||
);
|
||||
-- fast key-lookup
|
||||
CREATE INDEX openmct_key on openmct_objects(key);
|
30
openmct.go
Normal file
30
openmct.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
//go:build openmct
|
||||
|
||||
package gotelem
|
||||
|
||||
import (
|
||||
"embed"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
)
|
||||
|
||||
// this package provides a web router for the statif openmct build.
|
||||
// it should only be included if the build has been run,
|
||||
// to do so, run npm install and then npm run build.
|
||||
|
||||
//go:embed web/dist
|
||||
var public embed.FS
|
||||
|
||||
func OpenMCTRouter(r chi.Router) {
|
||||
// strip the subdirectory
|
||||
pfs, _ := fs.Sub(public, "web/dist")
|
||||
|
||||
// default route.
|
||||
r.Handle("/*", http.FileServerFS(pfs))
|
||||
}
|
||||
|
||||
func init() {
|
||||
RouterMods = append(RouterMods, OpenMCTRouter)
|
||||
}
|
36
readme.md
36
readme.md
|
@ -68,3 +68,39 @@ Certain features, like socketCAN support, are only enabled on platforms that sup
|
|||
This is handled automatically; builds will exclude the socketCAN files and
|
||||
the additional commands and features will not be present in the CLI.
|
||||
|
||||
### Lightweight Build
|
||||
|
||||
This doesn't include the OpenMCT files, but is simpler to build, and doesn't require Node setup.
|
||||
You must install Go.
|
||||
```
|
||||
$ go build ./cmd/gotelem
|
||||
```
|
||||
|
||||
### Full Build
|
||||
|
||||
This includes an integrated OpenMCT build, which automatically connects to the Telemetry server
|
||||
for historical and live data. You must have both Go and Node.JS installed.
|
||||
|
||||
```
|
||||
$ cd web/
|
||||
$ npm install
|
||||
$ npm run build
|
||||
$ cd ..
|
||||
$ go build -tags openmct ./cmd/gotelem
|
||||
```
|
||||
|
||||
## Development
|
||||
|
||||
During development, it can be useful to have the OpenMCT sources be served separately from Gotelem,
|
||||
so you don't need to rebuild everything. This case is supported:
|
||||
|
||||
```
|
||||
$ go run ./cmd/gotelem server --db gotelem.db # in one terminal
|
||||
$ npm run serve # in a separate terminal
|
||||
```
|
||||
When using the dev server, webpack will set the Gotelem URL to `localhost:8080`. If you're running
|
||||
Gotelem using the default settings, this should work out of the box. Making changes to the OpenMCT
|
||||
plugins will trigger a refresh automatically.
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -18,37 +18,37 @@ import (
|
|||
|
||||
// SkylabFile is a yaml file from skylab.
|
||||
type SkylabFile struct {
|
||||
Packets []PacketDef `yaml:"packets"`
|
||||
Boards []BoardDef `yaml:"boards"`
|
||||
Packets []PacketDef `yaml:"packets,omitempty" json:"packets,omitempty"`
|
||||
Boards []BoardDef `yaml:"boards,omitempty" json:"boards,omitempty"`
|
||||
}
|
||||
|
||||
type BoardDef struct {
|
||||
Name string `yaml:"name"`
|
||||
Transmit []string `yaml:"transmit"`
|
||||
Receive []string `yaml:"receive"`
|
||||
Name string `yaml:"name,omitempty" json:"name,omitempty"`
|
||||
Transmit []string `yaml:"transmit,omitempty" json:"transmit,omitempty"`
|
||||
Receive []string `yaml:"receive,omitempty" json:"receive,omitempty"`
|
||||
}
|
||||
|
||||
// data field.
|
||||
type FieldDef struct {
|
||||
Name string `yaml:"name"`
|
||||
Type string `yaml:"type"`
|
||||
Units string `yaml:"units"`
|
||||
Conversion float32 `yaml:"conversion"`
|
||||
Name string `yaml:"name,omitempty" json:"name,omitempty"`
|
||||
Type string `yaml:"type,omitempty" json:"type,omitempty"`
|
||||
Units string `yaml:"units,omitempty" json:"units,omitempty"`
|
||||
Conversion float32 `yaml:"conversion,omitempty" json:"conversion,omitempty"`
|
||||
Bits []struct {
|
||||
Name string `yaml:"name"`
|
||||
} `yaml:"bits"`
|
||||
Name string `yaml:"name,omitempty" json:"name,omitempty"`
|
||||
} `yaml:"bits,omitempty" json:"bits,omitempty"`
|
||||
}
|
||||
|
||||
// a PacketDef is a full can packet.
|
||||
type PacketDef struct {
|
||||
Name string `yaml:"name"`
|
||||
Description string `yaml:"description"`
|
||||
Id uint32 `yaml:"id"`
|
||||
Endian string `yaml:"endian"`
|
||||
Extended bool `yaml:"is_extended"`
|
||||
Repeat int `yaml:"repeat"`
|
||||
Offset int `yaml:"offset"`
|
||||
Data []FieldDef `yaml:"data"`
|
||||
Name string `yaml:"name,omitempty" json:"name,omitempty"`
|
||||
Description string `yaml:"description,omitempty" json:"description,omitempty"`
|
||||
Id uint32 `yaml:"id,omitempty" json:"id,omitempty"`
|
||||
Endian string `yaml:"endian,omitempty" json:"endian,omitempty"`
|
||||
IsExtended bool `yaml:"is_extended,omitempty" json:"is_extended,omitempty"`
|
||||
Repeat int `yaml:"repeat,omitempty" json:"repeat,omitempty"`
|
||||
Offset int `yaml:"offset,omitempty" json:"offset,omitempty"`
|
||||
Data []FieldDef `yaml:"data,omitempty" json:"data,omitempty"`
|
||||
}
|
||||
|
||||
// we need to generate bitfield types.
|
||||
|
@ -278,13 +278,13 @@ func idToString(p PacketDef) string {
|
|||
if p.Repeat > 0 {
|
||||
resp := make([]string, p.Repeat)
|
||||
for idx := 0; idx < p.Repeat; idx++ {
|
||||
resp[idx] = fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", int(p.Id)+idx*p.Offset, p.Extended)
|
||||
resp[idx] = fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", int(p.Id)+idx*p.Offset, p.IsExtended)
|
||||
}
|
||||
|
||||
return strings.Join(resp, ",")
|
||||
|
||||
} else {
|
||||
return fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", p.Id, p.Extended)
|
||||
return fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", p.Id, p.IsExtended)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
package skylab
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
@ -74,7 +75,6 @@ type Sizer interface {
|
|||
// CanSend takes a packet and makes CAN framing data.
|
||||
func ToCanFrame(p Packet) (f can.Frame, err error) {
|
||||
|
||||
|
||||
f.Id, err = p.CanId()
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -95,9 +95,9 @@ type RawJsonEvent struct {
|
|||
|
||||
// BusEvent is a timestamped Skylab packet - it contains
|
||||
type BusEvent struct {
|
||||
Timestamp time.Time `json:"ts"`
|
||||
Name string `json:"id"`
|
||||
Data Packet `json:"data"`
|
||||
Timestamp time.Time
|
||||
Name string
|
||||
Data Packet
|
||||
}
|
||||
|
||||
func (e BusEvent) MarshalJSON() (b []byte, err error) {
|
||||
|
@ -117,6 +117,9 @@ func (e BusEvent) MarshalJSON() (b []byte, err error) {
|
|||
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements JSON unmarshalling. Note that this
|
||||
// uses RawJSON events, which are formatted differently.
|
||||
// also it uses int64 milliseconds instead of times.
|
||||
func (e *BusEvent) UnmarshalJSON(b []byte) error {
|
||||
j := &RawJsonEvent{}
|
||||
|
||||
|
@ -133,6 +136,19 @@ func (e *BusEvent) UnmarshalJSON(b []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Equals compares two bus events deeply.
|
||||
func (e *BusEvent) Equals(other *BusEvent) bool {
|
||||
if e.Name != other.Name {
|
||||
return false
|
||||
}
|
||||
if !e.Timestamp.Equal(other.Timestamp) {
|
||||
return false
|
||||
}
|
||||
pkt1, _ := e.Data.MarshalPacket()
|
||||
pkt2, _ := e.Data.MarshalPacket()
|
||||
return bytes.Equal(pkt1, pkt2)
|
||||
}
|
||||
|
||||
// we need to be able to parse the JSON as well. this is done using the
|
||||
// generator since we can use the switch/case thing since it's the fastest
|
||||
|
||||
|
@ -146,8 +162,9 @@ func (e *UnknownIdError) Error() string {
|
|||
|
||||
type BadLengthError struct {
|
||||
expected uint32
|
||||
actual uint32
|
||||
actual uint32
|
||||
}
|
||||
|
||||
func (e *BadLengthError) Error() string {
|
||||
return fmt.Sprintf("bad data length, expected %d, got %d", e.expected, e.actual)
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load diff
|
@ -46,7 +46,7 @@ type {{$structName}} struct {
|
|||
}
|
||||
|
||||
func (p *{{$structName}}) CanId() (can.CanID, error) {
|
||||
c := can.CanID{Extended: {{.Extended}}}
|
||||
c := can.CanID{Extended: {{.IsExtended}}}
|
||||
{{- if .Repeat }}
|
||||
if p.Idx >= {{.Repeat}} {
|
||||
return c, &UnknownIdError{ {{ printf "0x%X" .Id }} }
|
||||
|
@ -108,10 +108,10 @@ var idMap = map[can.CanID]bool{
|
|||
{{ range $p := .Packets -}}
|
||||
{{ if $p.Repeat }}
|
||||
{{ range $idx := Nx (int $p.Id) $p.Repeat $p.Offset -}}
|
||||
{ Id: {{ $idx | printf "0x%X"}}, Extended: {{$p.Extended}} }: true,
|
||||
{ Id: {{ $idx | printf "0x%X"}}, Extended: {{$p.IsExtended}} }: true,
|
||||
{{ end }}
|
||||
{{- else }}
|
||||
{ Id: {{ $p.Id | printf "0x%X" }}, Extended: {{$p.Extended}} }: true,
|
||||
{ Id: {{ $p.Id | printf "0x%X" }}, Extended: {{$p.IsExtended}} }: true,
|
||||
{{- end}}
|
||||
{{- end}}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package skylab
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"reflect"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
|
@ -44,4 +45,20 @@ func TestJSON{{$structName}}(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func TestCanFrame{{$structName}}(t *testing.T) {
|
||||
v := &{{$structName}}{}
|
||||
frame, err := ToCanFrame(v)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got %v", err)
|
||||
}
|
||||
|
||||
retpkt, err := FromCanFrame(frame)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(v, retpkt) {
|
||||
t.Fatalf("decoded packet did not match sent %v got %v", v, retpkt)
|
||||
}
|
||||
}
|
||||
|
||||
{{- end }}
|
||||
|
|
|
@ -12,5 +12,3 @@ import (
|
|||
|
||||
// this program demonstrates basic CAN stuff.
|
||||
// i give up this shit is so hard
|
||||
|
||||
|
||||
|
|
|
@ -134,17 +134,18 @@ func (sck *CanSocket) Send(msg *can.Frame) error {
|
|||
|
||||
idToWrite := msg.Id.Id
|
||||
|
||||
if (msg.Id.Extended) {
|
||||
if msg.Id.Extended {
|
||||
idToWrite &= unix.CAN_EFF_MASK
|
||||
idToWrite |= unix.CAN_EFF_FLAG
|
||||
}
|
||||
|
||||
|
||||
switch msg.Kind {
|
||||
case can.CanRTRFrame:
|
||||
idToWrite |= unix.CAN_RTR_FLAG
|
||||
case can.CanErrFrame:
|
||||
return errors.New("you can't send error frames")
|
||||
case can.CanDataFrame:
|
||||
|
||||
default:
|
||||
return errors.New("unknown frame type")
|
||||
}
|
||||
|
@ -191,10 +192,10 @@ func (sck *CanSocket) Recv() (*can.Frame, error) {
|
|||
id.Id = raw_id
|
||||
if raw_id&unix.CAN_EFF_FLAG != 0 {
|
||||
// extended id frame
|
||||
id.Extended = true;
|
||||
id.Extended = true
|
||||
} else {
|
||||
// it's a normal can frame
|
||||
id.Extended = false;
|
||||
id.Extended = false
|
||||
}
|
||||
|
||||
var k can.Kind = can.CanDataFrame
|
||||
|
@ -203,8 +204,8 @@ func (sck *CanSocket) Recv() (*can.Frame, error) {
|
|||
// we got an error...
|
||||
k = can.CanErrFrame
|
||||
}
|
||||
|
||||
if raw_id & unix.CAN_RTR_FLAG != 0 {
|
||||
|
||||
if raw_id&unix.CAN_RTR_FLAG != 0 {
|
||||
k = can.CanRTRFrame
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ func TestCanSocket(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("test name", func(t *testing.T) {
|
||||
t.Run("test interface name", func(t *testing.T) {
|
||||
sock, _ := NewCanSocket("vcan0")
|
||||
defer sock.Close()
|
||||
|
||||
|
@ -51,7 +51,7 @@ func TestCanSocket(t *testing.T) {
|
|||
err := sock.Send(testFrame)
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
|
|
131
web/.gitignore
vendored
Normal file
131
web/.gitignore
vendored
Normal file
|
@ -0,0 +1,131 @@
|
|||
# Logs
|
||||
logs
|
||||
*.log
|
||||
npm-debug.log*
|
||||
yarn-debug.log*
|
||||
yarn-error.log*
|
||||
lerna-debug.log*
|
||||
.pnpm-debug.log*
|
||||
|
||||
# Diagnostic reports (https://nodejs.org/api/report.html)
|
||||
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
|
||||
|
||||
# Runtime data
|
||||
pids
|
||||
*.pid
|
||||
*.seed
|
||||
*.pid.lock
|
||||
|
||||
# Directory for instrumented libs generated by jscoverage/JSCover
|
||||
lib-cov
|
||||
|
||||
# Coverage directory used by tools like istanbul
|
||||
coverage
|
||||
*.lcov
|
||||
|
||||
# nyc test coverage
|
||||
.nyc_output
|
||||
|
||||
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
|
||||
.grunt
|
||||
|
||||
# Bower dependency directory (https://bower.io/)
|
||||
bower_components
|
||||
|
||||
# node-waf configuration
|
||||
.lock-wscript
|
||||
|
||||
# Compiled binary addons (https://nodejs.org/api/addons.html)
|
||||
build/Release
|
||||
|
||||
# Dependency directories
|
||||
node_modules/
|
||||
jspm_packages/
|
||||
|
||||
# Snowpack dependency directory (https://snowpack.dev/)
|
||||
web_modules/
|
||||
|
||||
# TypeScript cache
|
||||
*.tsbuildinfo
|
||||
|
||||
# Optional npm cache directory
|
||||
.npm
|
||||
|
||||
# Optional eslint cache
|
||||
.eslintcache
|
||||
|
||||
# Optional stylelint cache
|
||||
.stylelintcache
|
||||
|
||||
# Microbundle cache
|
||||
.rpt2_cache/
|
||||
.rts2_cache_cjs/
|
||||
.rts2_cache_es/
|
||||
.rts2_cache_umd/
|
||||
|
||||
# Optional REPL history
|
||||
.node_repl_history
|
||||
|
||||
# Output of 'npm pack'
|
||||
*.tgz
|
||||
|
||||
# Yarn Integrity file
|
||||
.yarn-integrity
|
||||
|
||||
# dotenv environment variable files
|
||||
.env
|
||||
.env.development.local
|
||||
.env.test.local
|
||||
.env.production.local
|
||||
.env.local
|
||||
|
||||
# parcel-bundler cache (https://parceljs.org/)
|
||||
.cache
|
||||
.parcel-cache
|
||||
|
||||
# Next.js build output
|
||||
.next
|
||||
out
|
||||
|
||||
# Nuxt.js build / generate output
|
||||
.nuxt
|
||||
dist
|
||||
|
||||
# Gatsby files
|
||||
.cache/
|
||||
# Comment in the public line in if your project uses Gatsby and not Next.js
|
||||
# https://nextjs.org/blog/next-9-1#public-directory-support
|
||||
# public
|
||||
|
||||
# vuepress build output
|
||||
.vuepress/dist
|
||||
|
||||
# vuepress v2.x temp and cache directory
|
||||
.temp
|
||||
.cache
|
||||
|
||||
# Docusaurus cache and generated files
|
||||
.docusaurus
|
||||
|
||||
# Serverless directories
|
||||
.serverless/
|
||||
|
||||
# FuseBox cache
|
||||
.fusebox/
|
||||
|
||||
# DynamoDB Local files
|
||||
.dynamodb/
|
||||
|
||||
# TernJS port file
|
||||
.tern-port
|
||||
|
||||
# Stores VSCode versions used for testing VSCode extensions
|
||||
.vscode-test
|
||||
|
||||
# yarn v2
|
||||
.yarn/cache
|
||||
.yarn/unplugged
|
||||
.yarn/build-state.yml
|
||||
.yarn/install-state.gz
|
||||
.pnp.*
|
||||
|
10
web/eslint.config.js
Normal file
10
web/eslint.config.js
Normal file
|
@ -0,0 +1,10 @@
|
|||
import eslint from '@eslint/js';
|
||||
import tseslint from 'typescript-eslint';
|
||||
|
||||
export default tseslint.config(
|
||||
eslint.configs.recommended,
|
||||
...tseslint.configs.recommended,
|
||||
{
|
||||
"ignores": ["dist/*"]
|
||||
}
|
||||
);
|
5920
web/package-lock.json
generated
Normal file
5920
web/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load diff
30
web/package.json
Normal file
30
web/package.json
Normal file
|
@ -0,0 +1,30 @@
|
|||
{
|
||||
"name": "g1_openmct",
|
||||
"version": "1.0.0",
|
||||
"description": "dev environment for openmct plugins for g1 strategy tool",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"build": "webpack --config webpack.prod.js",
|
||||
"serve": "webpack serve --config webpack.dev.js"
|
||||
},
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"openmct": "^3.2.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^20.11.25",
|
||||
"@types/webpack": "^5.28.5",
|
||||
"copy-webpack-plugin": "^12.0.2",
|
||||
"eslint": "^8.57.0",
|
||||
"html-webpack-plugin": "^5.6.0",
|
||||
"ts-loader": "^9.5.1",
|
||||
"ts-node": "^10.9.2",
|
||||
"typescript": "^5.4.2",
|
||||
"typescript-eslint": "^7.1.1",
|
||||
"webpack": "^5.90.3",
|
||||
"webpack-cli": "^5.1.4",
|
||||
"webpack-dev-server": "^5.0.2",
|
||||
"webpack-merge": "^5.10.0"
|
||||
}
|
||||
}
|
332
web/src/app.ts
Normal file
332
web/src/app.ts
Normal file
|
@ -0,0 +1,332 @@
|
|||
import openmct from "openmct";
|
||||
|
||||
//@ts-expect-error openmct
|
||||
openmct.setAssetPath('openmct');
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.LocalStorage());
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.MyItems());
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Timeline());
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.UTCTimeSystem());
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Clock({ enableClockIndicator: true }));
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Timer());
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Timelist());
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Hyperlink())
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Notebook())
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.BarChart())
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.ScatterPlot())
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.SummaryWidget())
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.LADTable());
|
||||
openmct.time.clock('local', { start: -5 * 60 * 1000, end: 0 });
|
||||
//@ts-expect-error openmct
|
||||
openmct.time.timeSystem('utc');
|
||||
//@ts-expect-error openmct
|
||||
openmct.install(openmct.plugins.Espresso());
|
||||
|
||||
openmct.install(
|
||||
//@ts-expect-error openmct
|
||||
openmct.plugins.Conductor({
|
||||
menuOptions: [
|
||||
{
|
||||
name: 'Fixed',
|
||||
timeSystem: 'utc',
|
||||
bounds: {
|
||||
start: Date.now() - 30000000,
|
||||
end: Date.now()
|
||||
},
|
||||
|
||||
},
|
||||
{
|
||||
name: 'Realtime',
|
||||
timeSystem: 'utc',
|
||||
clock: 'local',
|
||||
clockOffsets: {
|
||||
start: -30000000,
|
||||
end: 30000
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
]
|
||||
})
|
||||
);
|
||||
|
||||
|
||||
|
||||
if (process.env.BASE_URL) {
|
||||
console.log("got a thing")
|
||||
console.log(process.env.BASE_URL)
|
||||
}
|
||||
interface SkylabField {
|
||||
name: string
|
||||
type: string
|
||||
units?: string
|
||||
conversion?: number
|
||||
bits?: {
|
||||
name: string
|
||||
}
|
||||
}
|
||||
interface SkylabPacket {
|
||||
name: string
|
||||
description?: string
|
||||
id: number
|
||||
endian?: string
|
||||
is_extended: boolean
|
||||
repeat?: number
|
||||
offset?: number
|
||||
data: [SkylabField]
|
||||
}
|
||||
|
||||
interface SkylabBoard {
|
||||
name: string
|
||||
transmit: [string]
|
||||
receive: [string]
|
||||
}
|
||||
|
||||
interface SkylabSchema {
|
||||
packets: [SkylabPacket]
|
||||
boards: [SkylabBoard]
|
||||
}
|
||||
|
||||
let schemaCached = null;
|
||||
function getSchema(): Promise<SkylabSchema> {
|
||||
if (schemaCached === null) {
|
||||
return fetch(`${process.env.BASE_URL}/api/v1/schema`).then((resp) => {
|
||||
const res = resp.json()
|
||||
console.log("got schema, caching", res);
|
||||
schemaCached = res
|
||||
return res
|
||||
})
|
||||
}
|
||||
return Promise.resolve(schemaCached)
|
||||
}
|
||||
|
||||
const objectProvider = {
|
||||
get: function (id) {
|
||||
return getSchema().then((schema) => {
|
||||
if (id.key === "car") {
|
||||
const comp = schema.packets.map((x) => {
|
||||
return {
|
||||
key: x.name,
|
||||
namespace: "umnsvp"
|
||||
}
|
||||
})
|
||||
return {
|
||||
identifier: id,
|
||||
name: "the solar car",
|
||||
type: 'folder',
|
||||
location: 'ROOT',
|
||||
composition: comp
|
||||
}
|
||||
}
|
||||
const pkt = schema.packets.find((x) => x.name === id.key)
|
||||
if (pkt) {
|
||||
// if the key matches one of the packet names,
|
||||
// we know it's a packet.
|
||||
|
||||
// construct a list of fields for this packet.
|
||||
const comp = pkt.data.map((field) => {
|
||||
if (field.type === "bitfield") {
|
||||
//
|
||||
|
||||
}
|
||||
return {
|
||||
// we have to do this since
|
||||
// we can't get the packet name otherwise.
|
||||
key: `${pkt.name}.${field.name}`,
|
||||
namespace: "umnsvp"
|
||||
}
|
||||
})
|
||||
return {
|
||||
identifier: id,
|
||||
name: pkt.name,
|
||||
type: 'folder',
|
||||
composition: comp
|
||||
}
|
||||
}
|
||||
// at this point it's definitely a field aka umnsvp-datum
|
||||
const [pktName, fieldName] = id.key.split('.')
|
||||
return {
|
||||
identifier: id,
|
||||
name: fieldName,
|
||||
type: 'umnsvp-datum',
|
||||
conversion: schema.packets.find((x) => x.name === pktName).data.find((f) => f.name === fieldName).conversion,
|
||||
telemetry: {
|
||||
values: [
|
||||
{
|
||||
key: "value",
|
||||
source: "val",
|
||||
name: "Value",
|
||||
"format": "float",
|
||||
hints: {
|
||||
range: 1
|
||||
}
|
||||
},
|
||||
{
|
||||
key: "utc",
|
||||
source: "ts",
|
||||
name: "Timestamp",
|
||||
format: "utc",
|
||||
hints: {
|
||||
domain: 1
|
||||
}
|
||||
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
interface Datum {
|
||||
ts: number
|
||||
val: number
|
||||
}
|
||||
const TelemHistoryProvider = {
|
||||
supportsRequest: function (dObj) {
|
||||
return dObj.type === 'umnsvp-datum'
|
||||
},
|
||||
request: function (dObj, opt) {
|
||||
const [pktName, fieldName] = dObj.identifier.key.split('.')
|
||||
const url = `${process.env.BASE_URL}/api/v1/packets/${pktName}/${fieldName}?`
|
||||
const params = new URLSearchParams({
|
||||
start: new Date(opt.start).toISOString(),
|
||||
end: new Date(opt.end).toISOString(),
|
||||
})
|
||||
console.log((opt.end - opt.start) / opt.size)
|
||||
return fetch(url + params).then((resp) => {
|
||||
resp.json().then((result: [Datum]) => {
|
||||
|
||||
if (dObj.conversion && dObj.conversion != 0) {
|
||||
// apply conversion
|
||||
result.map((dat) => {
|
||||
dat.val = dat.val * dObj.conversion
|
||||
return dat
|
||||
})
|
||||
}
|
||||
return result
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
interface PacketData {
|
||||
ts: number
|
||||
name: string
|
||||
data: object
|
||||
}
|
||||
|
||||
function TelemRealtimeProvider() {
|
||||
return function (openmct: openmct.OpenMCT) {
|
||||
|
||||
const simpleIndicator = openmct.indicators.simpleIndicator();
|
||||
openmct.indicators.add(simpleIndicator);
|
||||
simpleIndicator.text("0 Listeners")
|
||||
const url = `${process.env.BASE_URL.replace(/^http/, 'ws')}/api/v1/packets/subscribe?`
|
||||
// we put our websocket connection here.
|
||||
let connection = new WebSocket(url)
|
||||
// connections contains name: callback mapping
|
||||
const callbacks = {}
|
||||
|
||||
const conversions: Map<string, number> = new Map()
|
||||
// names contains a set of *packet names*
|
||||
const names = new Set()
|
||||
|
||||
function handleMessage(event) {
|
||||
const data: PacketData = JSON.parse(event.data)
|
||||
for (const [key, value] of Object.entries(data.data)) { // for each of the fields in the data
|
||||
const id = `${data.name}.${key}` // if we have a matching callback for that field.
|
||||
if (id in callbacks) {
|
||||
// we should construct a telem point and make a callback.
|
||||
// compute if we need to scale the value.
|
||||
callbacks[id]({
|
||||
"ts": data.ts,
|
||||
"val": value * conversions.get(id)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function updateWebsocket() {
|
||||
const params = new URLSearchParams()
|
||||
for (const name in names) {
|
||||
params.append("name", name)
|
||||
}
|
||||
connection = new WebSocket(url + params)
|
||||
|
||||
connection.onmessage = handleMessage
|
||||
simpleIndicator.text(`${names.size} Listeners`)
|
||||
}
|
||||
|
||||
const provider = {
|
||||
supportsSubscribe: function (dObj) {
|
||||
return dObj.type === "umnsvp-datum"
|
||||
},
|
||||
subscribe: function (dObj, callback) {
|
||||
// identifier is packetname.fieldname. we add the packet name to the set.
|
||||
const key = dObj.identifier.key
|
||||
const pktName = key.split('.')[0]
|
||||
// add our callback to the dictionary,
|
||||
// add the packet name to the set
|
||||
callbacks[key] = callback
|
||||
conversions.set(key, dObj.conversion || 1)
|
||||
names.add(pktName)
|
||||
// update the websocket URL with the new name.
|
||||
updateWebsocket()
|
||||
return function unsubscribe() {
|
||||
// if there's no more listeners on this packet,
|
||||
// we can remove it.
|
||||
console.log("subscribe called %s", JSON.stringify(dObj))
|
||||
if (!Object.keys(callbacks).some((k) => k.startsWith(pktName))) {
|
||||
names.delete(pktName)
|
||||
updateWebsocket()
|
||||
}
|
||||
|
||||
delete callbacks[key]
|
||||
conversions.delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
openmct.telemetry.addProvider(provider)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function GotelemPlugin() {
|
||||
return function install(openmct) {
|
||||
|
||||
openmct.types.addType('umnsvp-datum', {
|
||||
name: "UMN SVP Data Field",
|
||||
description: "A data field of a packet from the car",
|
||||
creatable: false,
|
||||
cssClass: "icon-telemetry"
|
||||
})
|
||||
openmct.objects.addRoot({
|
||||
namespace: "umnsvp",
|
||||
key: 'car'
|
||||
}, openmct.priority.HIGH)
|
||||
openmct.objects.addProvider('umnsvp', objectProvider);
|
||||
openmct.telemetry.addProvider(TelemHistoryProvider)
|
||||
openmct.install(TelemRealtimeProvider())
|
||||
}
|
||||
}
|
||||
|
||||
openmct.install(GotelemPlugin())
|
||||
|
||||
//@ts-expect-error openmct
|
||||
openmct.start()
|
10
web/src/index.html
Normal file
10
web/src/index.html
Normal file
|
@ -0,0 +1,10 @@
|
|||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Open MCT Tutorials</title>
|
||||
<script src="openmct/openmct.js"></script>
|
||||
</head>
|
||||
<body>
|
||||
<div id="app"></div>
|
||||
</body>
|
||||
</html>
|
19
web/tsconfig.json
Normal file
19
web/tsconfig.json
Normal file
|
@ -0,0 +1,19 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
// "baseUrl": "./src",
|
||||
"target": "es6",
|
||||
"checkJs": true,
|
||||
"allowJs": true,
|
||||
// "moduleResolution": "NodeNext",
|
||||
"allowSyntheticDefaultImports": true,
|
||||
"paths": {
|
||||
"openmct": ["./node_modules/openmct/dist/openmct.d.ts"]
|
||||
},
|
||||
"esModuleInterop": true,
|
||||
},
|
||||
"exclude": [
|
||||
"./dist/**/*",
|
||||
"webpack.*.js",
|
||||
"eslint.config.js"
|
||||
]
|
||||
}
|
40
web/webpack.common.js
Normal file
40
web/webpack.common.js
Normal file
|
@ -0,0 +1,40 @@
|
|||
import path from 'path';
|
||||
import {fileURLToPath} from 'url';
|
||||
import HtmlWebpackPlugin from 'html-webpack-plugin';
|
||||
import CopyPlugin from 'copy-webpack-plugin';
|
||||
|
||||
const config = {
|
||||
entry: './src/app.ts',
|
||||
module: {
|
||||
rules: [
|
||||
{
|
||||
test: /\.tsx?$/,
|
||||
use: 'ts-loader',
|
||||
exclude: /node_modules/,
|
||||
},
|
||||
],
|
||||
},
|
||||
plugins: [
|
||||
new HtmlWebpackPlugin({
|
||||
template: 'src/index.html',
|
||||
filename: 'index.html',
|
||||
}),
|
||||
new CopyPlugin({
|
||||
patterns: [
|
||||
{ from: "**/*", to: "openmct/", context: "node_modules/openmct/dist"},
|
||||
]
|
||||
})
|
||||
],
|
||||
resolve: {
|
||||
extensions: ['.tsx', '.ts', '.js'],
|
||||
},
|
||||
externals: {
|
||||
openmct: "openmct",
|
||||
},
|
||||
output: {
|
||||
filename: 'main.js',
|
||||
path: path.resolve(path.dirname(fileURLToPath(import.meta.url)), 'dist'),
|
||||
},
|
||||
};
|
||||
|
||||
export default config
|
24
web/webpack.dev.js
Normal file
24
web/webpack.dev.js
Normal file
|
@ -0,0 +1,24 @@
|
|||
import {merge} from 'webpack-merge';
|
||||
import common from "./webpack.common.js"
|
||||
import webpack from 'webpack'
|
||||
|
||||
const config = merge(common, {
|
||||
mode: "development",
|
||||
devtool: 'inline-source-map',
|
||||
plugins: [
|
||||
new webpack.EnvironmentPlugin({
|
||||
NODE_ENV: "development",
|
||||
BASE_URL: "http://localhost:8080"
|
||||
}),
|
||||
],
|
||||
devServer: {
|
||||
static: "./dist",
|
||||
headers: {
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
'Access-Control-Allow-Headers': '*',
|
||||
'Access-Control-Allow-Methods': '*',
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
export default config
|
16
web/webpack.prod.js
Normal file
16
web/webpack.prod.js
Normal file
|
@ -0,0 +1,16 @@
|
|||
import { merge } from 'webpack-merge'
|
||||
import common from './webpack.common.js'
|
||||
import webpack from 'webpack'
|
||||
|
||||
const config = merge(common, {
|
||||
mode: "production",
|
||||
plugins: [
|
||||
new webpack.EnvironmentPlugin({
|
||||
NODE_ENV: "production",
|
||||
BASE_URL: "",
|
||||
})
|
||||
],
|
||||
devtool: 'source-map',
|
||||
})
|
||||
|
||||
export default config
|
|
@ -5,7 +5,6 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
)
|
||||
|
||||
// Frameable is an object that can be sent in an XBee Frame. An XBee Frame
|
||||
|
@ -104,7 +103,6 @@ func xbeeFrameSplit(data []byte, atEOF bool) (advance int, token []byte, err err
|
|||
}
|
||||
// FIXME: add bounds checking! this can panic.
|
||||
var frameLen = int(binary.BigEndian.Uint16(data[startIdx+1:startIdx+3])) + 4
|
||||
|
||||
|
||||
// if the value of frameLen is > 0x100, we know that it's screwed up.
|
||||
// this helps keep error duration lowered.
|
||||
|
|
|
@ -103,14 +103,13 @@ func Test_xbeeFrameSplit(t *testing.T) {
|
|||
{
|
||||
name: "start delimiter inside partial packet",
|
||||
args: args{
|
||||
data: advTest,
|
||||
data: advTest,
|
||||
atEOF: false,
|
||||
},
|
||||
wantAdvance: 2,
|
||||
wantToken: nil,
|
||||
wantErr: false,
|
||||
wantToken: nil,
|
||||
wantErr: false,
|
||||
},
|
||||
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
@ -129,8 +128,6 @@ func Test_xbeeFrameSplit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
func Test_parseFrame(t *testing.T) {
|
||||
type args struct {
|
||||
frame []byte
|
||||
|
|
Loading…
Reference in a new issue