Compare commits

...

46 commits

Author SHA1 Message Date
saji 900ad6d495 release build correct package
All checks were successful
release / release-full (push) Successful in 8m25s
2024-03-09 09:15:01 -06:00
saji 16cc7019bc fix xgo version
All checks were successful
release / release-full (push) Successful in 7m40s
2024-03-09 08:37:07 -06:00
saji 4566ea369c make release crossbuild
Some checks failed
release / release-full (push) Failing after 1m55s
2024-03-09 08:33:21 -06:00
saji 1e72b93143 remove unused variable
All checks were successful
Node.js CI / build-openmct (push) Successful in 1m26s
2024-03-08 22:00:18 -06:00
saji 48b40ee30f make CI only run when affected 2024-03-08 21:59:22 -06:00
saji 9e9081fa4a fix unused dotenv plugin
Some checks failed
Go / build (1.21) (push) Successful in 1m7s
Go / build (1.22) (push) Successful in 1m5s
Node.js CI / build (push) Failing after 1m37s
2024-03-08 21:55:15 -06:00
saji 2dc5a0457b added node ci
Some checks failed
Node.js CI / build (push) Failing after 48s
Go / build (1.21) (push) Successful in 1m6s
Go / build (1.22) (push) Successful in 1m5s
2024-03-08 21:53:26 -06:00
saji 0f2af76156 add some types
All checks were successful
Go / build (1.21) (push) Successful in 1m8s
Go / build (1.22) (push) Successful in 1m7s
2024-03-08 19:25:08 -06:00
saji 2e36581665 fix: add missing files
All checks were successful
Go / build (1.21) (push) Successful in 1m7s
Go / build (1.22) (push) Successful in 1m5s
2024-03-08 17:09:08 -06:00
saji 4829dd50c7 move openmct plugin to typescript
All checks were successful
Go / build (1.21) (push) Successful in 1m5s
Go / build (1.22) (push) Successful in 1m4s
remove unused livestream test function
2024-03-08 17:08:43 -06:00
saji 13205c1668 got realtime working
Some checks failed
Go / build (1.21) (push) Failing after 1m6s
Go / build (1.22) (push) Failing after 1m5s
added demo livestream for testing
added openMCT realtime plugin
fixed websocket cross-origin fail
2024-03-08 11:51:59 -06:00
saji fe4cdfa0a4 revert order by change
All checks were successful
Go / build (1.21) (push) Successful in 1m8s
Go / build (1.22) (push) Successful in 1m7s
2024-03-08 09:40:54 -06:00
saji e9d40ce466 remove broker-db listener, db options
All checks were successful
Go / build (1.21) (push) Successful in 1m11s
Go / build (1.22) (push) Successful in 1m9s
2024-03-08 09:28:50 -06:00
saji d702395d5b cleanup comparison (thanks staticcheck)
All checks were successful
Go / build (1.21) (push) Successful in 1m8s
Go / build (1.22) (push) Successful in 1m7s
2024-03-07 23:52:41 -06:00
saji 90e8c3f101 more tests! added BusEvent.Equals
Some checks failed
Go / build (1.21) (push) Failing after 1m6s
Go / build (1.22) (push) Failing after 1m5s
2024-03-07 23:50:13 -06:00
saji a28393388b tests! 2024-03-07 23:02:46 -06:00
saji 54b7427428 misc cleanup
All checks were successful
Go / build (1.21) (push) Successful in 1m7s
Go / build (1.22) (push) Successful in 1m6s
2024-03-07 16:42:49 -06:00
saji e08ab050ef fix websocket endpoint 2024-03-07 16:42:27 -06:00
saji cf112ef561 fix socketcan bugs
All checks were successful
Go / build (1.21) (push) Successful in 1m7s
Go / build (1.22) (push) Successful in 1m5s
2024-03-07 15:06:16 -06:00
saji 641c35afbd add more logging to BusEvent Broker 2024-03-07 15:05:51 -06:00
saji 4a292aa009 change default database path to gotelem.db 2024-03-07 15:05:31 -06:00
saji d5381a3c33 add comments
All checks were successful
Go / build (1.21) (push) Successful in 1m6s
Go / build (1.22) (push) Successful in 1m4s
2024-03-07 13:30:32 -06:00
saji 0b5a917e40 add JSON line parser, rename logparsers 2024-03-07 13:30:15 -06:00
saji 1ff4adf5e4 add generator test for skylab 2024-03-07 13:29:50 -06:00
saji c8034066c9 add delete document test
All checks were successful
Go / build (1.21) (push) Successful in 1m3s
Go / build (1.22) (push) Successful in 1m1s
2024-03-07 07:34:55 -06:00
saji 3c1a96c8e0 format
All checks were successful
Go / build (1.21) (push) Successful in 1m18s
Go / build (1.22) (push) Successful in 1m16s
2024-03-07 06:18:49 -06:00
saji f380631b5e fix Close() before checking errors
All checks were successful
Go / build (1.21) (push) Successful in 1m14s
Go / build (1.22) (push) Successful in 1m12s
2024-03-06 18:45:32 -06:00
saji 456f84b5c7 remove QueryModifiers, replace with explicit
Some checks failed
Go / build (1.21) (push) Failing after 1m4s
Go / build (1.22) (push) Failing after 1m3s
2024-03-06 17:19:16 -06:00
saji daf4fe97dc added DocumentNotFound error 2024-03-06 17:09:02 -06:00
saji 5b38daf74f wip: Telem DB Document API and Tests 2024-03-06 16:42:39 -06:00
saji 7a98f52542 hack: remove faulty table from old migration 2024-03-06 15:16:13 -06:00
saji 0a6a6bb66d add openmct domain object table and skeleton 2024-03-06 15:15:56 -06:00
saji c9b73ee006 add repeated packet support using index parameter 2024-03-06 14:53:39 -06:00
saji b266a84324 fix multiple name packet filter 2024-03-06 14:53:25 -06:00
saji d591fa21b6 fix db test
All checks were successful
Go / build (1.21) (push) Successful in 2m32s
Go / build (1.22) (push) Successful in 2m30s
2024-03-06 10:51:54 -06:00
saji 8e314e9303 add orderby clause
Some checks failed
Go / build (1.22) (push) Failing after 1m58s
Go / build (1.21) (push) Failing after 2m1s
2024-03-06 10:48:40 -06:00
saji d90d7a0af4 openmct historical plugin MVP
All checks were successful
Go / build (1.21) (push) Successful in 1m16s
Go / build (1.22) (push) Successful in 1m15s
2024-03-05 09:49:08 -06:00
saji 9ec01c39de fix skylab json formatting
All checks were successful
Go / build (1.22) (push) Successful in 1m16s
Go / build (1.21) (push) Successful in 1m18s
2024-03-04 20:41:15 -06:00
saji bcd61321e6 add CORS to api 2024-03-04 20:40:55 -06:00
saji a015911e0e split openmct dev/prod 2024-03-04 20:40:32 -06:00
saji 648f2183c2 added node install/build
All checks were successful
Go / build (1.21) (push) Successful in 1m16s
Go / build (1.22) (push) Successful in 1m15s
release / release-openmct (push) Successful in 2m5s
2024-03-03 23:33:48 -06:00
saji 860d749c6b added openmct release
Some checks failed
release / release-openmct (push) Failing after 56s
Go / build (1.21) (push) Successful in 1m18s
Go / build (1.22) (push) Successful in 1m18s
2024-03-03 23:29:50 -06:00
saji 058e8d31b2 update packages
All checks were successful
Go / build (1.21) (push) Successful in 1m12s
Go / build (1.22) (push) Successful in 1m12s
2024-03-03 23:13:55 -06:00
saji 8b8619dd8a added openmct plugin and embedding
All checks were successful
Go / build (1.21) (push) Successful in 1m17s
Go / build (1.22) (push) Successful in 1m15s
2024-03-03 23:04:41 -06:00
saji 93be82f416 update gitignore for WAL mode sqlite db
All checks were successful
Go / build (1.21) (push) Successful in 1m14s
Go / build (1.22) (push) Successful in 1m12s
2024-03-02 21:56:31 -06:00
saji 4e6f8db7ed combine separate packages
All checks were successful
Go / build (1.21) (push) Successful in 1m16s
Go / build (1.22) (push) Successful in 1m15s
2024-03-02 21:49:18 -06:00
61 changed files with 9539 additions and 777 deletions

46
.github/workflows/build-openmct.yml vendored Normal file
View file

@ -0,0 +1,46 @@
name: release
on:
push:
tags:
- '*'
jobs:
release-full:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: 1.22
- name: Set up Node
uses: actions/setup-node@v4
- name: Install OpenMCT
run: npm ci
working-directory: web/
- name: Build OpenMCT bundle
run: npm run build
working-directory: web/
- name: Build
uses: crazy-max/ghaction-xgo@v3
with:
xgo_version: latest
go_version: 1.21
pkg: cmd/gotelem
dest: build
prefix: gotelem-full
targets: windows/amd64,linux/amd64,linux/arm64,linux/arm/v7,darwin/arm64,darwin/amd64
tags: openmct
v: true
x: false
race: false
ldflags: -s -w
buildmode: default
trimpath: true
- name: Release binaries
uses: https://gitea.com/actions/release-action@main
with:
files: |-
build/**
api_key: '${{secrets.RELEASE_TOKEN}}'

View file

@ -3,10 +3,12 @@ name: Go
on:
push:
branches: [ "master" ]
paths:
- "**.go"
jobs:
build:
build-gotelem:
runs-on: ubuntu-latest
strategy:
matrix:

25
.github/workflows/nodejs.yml vendored Normal file
View file

@ -0,0 +1,25 @@
name: Node.js CI
on:
push:
paths:
- "web/**"
jobs:
build-openmct:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./web/
steps:
- uses: actions/checkout@v4
- name: Use Node.js
uses: actions/setup-node@v4
with:
node-version: '20.x'
- run: npm ci
- run: npm run build --if-present
- run: npx eslint .

6
.gitignore vendored
View file

@ -27,4 +27,8 @@ go.work
/skylabify
*.db
*.db-journal
/logs/
/logs/
*.db-wal
*.db-shm
*.sqbpro

View file

@ -9,6 +9,8 @@ import (
"github.com/kschamplin/gotelem/skylab"
)
// Broker is a Bus Event broadcast system. You can subscribe to events,
// and send events.
type Broker struct {
subs map[string]chan skylab.BusEvent // contains the channel for each subsciber
@ -17,6 +19,7 @@ type Broker struct {
bufsize int // size of chan buffer in elements.
}
// NewBroker creates a new broker with a given logger.
func NewBroker(bufsize int, logger *slog.Logger) *Broker {
return &Broker{
subs: make(map[string]chan skylab.BusEvent),
@ -25,6 +28,7 @@ func NewBroker(bufsize int, logger *slog.Logger) *Broker {
}
}
// Subscribe joins the broker with the given name. The name must be unique.
func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
// get rw lock.
b.lock.Lock()
@ -33,23 +37,33 @@ func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
if ok {
return nil, errors.New("name already in use")
}
b.logger.Info("new subscriber", "name", name)
b.logger.Info("subscribe", "name", name)
ch = make(chan skylab.BusEvent, b.bufsize)
b.subs[name] = ch
return
}
// Unsubscribe removes a subscriber matching the name. It doesn't do anything
// if there's nobody subscribed with that name
func (b *Broker) Unsubscribe(name string) {
// remove the channel from the map. We don't need to close it.
b.lock.Lock()
defer b.lock.Unlock()
delete(b.subs, name)
b.logger.Debug("unsubscribe", "name", name)
if _, ok := b.subs[name]; ok {
close(b.subs[name])
delete(b.subs, name)
}
}
// Publish sends a bus event to all subscribers. It includes a sender
// string which prevents loopback.
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
b.lock.RLock()
defer b.lock.RUnlock()
b.logger.Debug("publish", "sender", sender, "message", message)
for name, ch := range b.subs {
if name == sender {
continue
@ -57,7 +71,6 @@ func (b *Broker) Publish(sender string, message skylab.BusEvent) {
// non blocking send.
select {
case ch <- message:
b.logger.Debug("sent message", "dest", name, "src", sender)
default:
b.logger.Warn("recipient buffer full", "dest", name)
}

122
broker_test.go Normal file
View file

@ -0,0 +1,122 @@
package gotelem
import (
"log/slog"
"os"
"sync"
"testing"
"time"
"github.com/kschamplin/gotelem/skylab"
)
func makeEvent() skylab.BusEvent {
var pkt skylab.Packet = &skylab.BmsMeasurement{
BatteryVoltage: 12000,
AuxVoltage: 24000,
Current: 1.23,
}
return skylab.BusEvent{
Timestamp: time.Now(),
Name: pkt.String(),
Data: pkt,
}
}
func TestBroker(t *testing.T) {
t.Parallel()
t.Run("test send", func(t *testing.T) {
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
broker := NewBroker(10, flog)
sub, err := broker.Subscribe("testSub")
if err != nil {
t.Fatalf("error subscribing: %v", err)
}
testEvent := makeEvent()
go func() {
time.Sleep(time.Millisecond * 1)
broker.Publish("other", testEvent)
}()
var recvEvent skylab.BusEvent
select {
case recvEvent = <-sub:
if !testEvent.Equals(&recvEvent) {
t.Fatalf("events not equal, want %v got %v", testEvent, recvEvent)
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for packet")
}
})
t.Run("multiple broadcast", func(t *testing.T) {
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
broker := NewBroker(10, flog)
testEvent := makeEvent()
wg := sync.WaitGroup{}
clientFn := func(name string) {
sub, err := broker.Subscribe(name)
if err != nil {
t.Log(err)
return
}
<-sub
wg.Done()
}
wg.Add(2)
go clientFn("client1")
go clientFn("client2")
// yes this is stupid. otherwise we race.
time.Sleep(10 * time.Millisecond)
broker.Publish("sender", testEvent)
done := make(chan bool)
go func() {
wg.Wait()
done <- true
}()
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for clients")
}
})
t.Run("name collision", func(t *testing.T) {
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
broker := NewBroker(10, flog)
_, err := broker.Subscribe("collide")
if err != nil {
t.Fatal(err)
}
_, err = broker.Subscribe("collide")
if err == nil {
t.Fatal("expected error, got nil")
}
})
t.Run("unsubscribe", func(t *testing.T) {
flog := slog.New(slog.NewTextHandler(os.Stderr, nil))
broker := NewBroker(10, flog)
ch, err := broker.Subscribe("test")
if err != nil {
t.Fatal(err)
}
broker.Unsubscribe("test")
_, ok := <-ch
if ok {
t.Fatal("expected dead channel, but channel returned result")
}
})
}

View file

@ -9,7 +9,7 @@ import (
"strings"
"sync/atomic"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/internal/logparsers"
"github.com/kschamplin/gotelem/skylab"
"github.com/urfave/cli/v2"
@ -81,7 +81,7 @@ func importAction(ctx *cli.Context) error {
}
dbPath := ctx.Path("database")
db, err := db.OpenTelemDb(dbPath)
db, err := gotelem.OpenTelemDb(dbPath)
if err != nil {
return fmt.Errorf("error opening database: %w", err)
}

View file

@ -5,14 +5,14 @@ import (
"fmt"
"io"
"net/http"
"math"
"time"
"os"
"sync"
"log/slog"
"github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/internal/api"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/skylab"
"github.com/kschamplin/gotelem/xbee"
"github.com/urfave/cli/v2"
@ -37,6 +37,10 @@ var serveFlags = []cli.Flag{
DefaultText: "gotelem.db",
Usage: "database to serve, if not specified will use memory",
},
&cli.BoolFlag{
Name: "demo",
Usage: "enable the demo packet stream",
},
}
var serveCmd = &cli.Command{
@ -54,12 +58,11 @@ var serveCmd = &cli.Command{
type service interface {
fmt.Stringer
Start(cCtx *cli.Context, deps svcDeps) (err error)
Status()
}
type svcDeps struct {
Broker *gotelem.Broker
Db *db.TelemDb
Db *gotelem.TelemDb
Logger *slog.Logger
}
@ -68,9 +71,8 @@ type svcDeps struct {
// or if certain features are present (see cli/sqlite.go)
var serveThings = []service{
&xBeeService{},
// &canLoggerService{},
&dbWriterService{},
&httpService{},
&DemoService{},
}
func serve(cCtx *cli.Context) error {
@ -94,12 +96,12 @@ func serve(cCtx *cli.Context) error {
broker := gotelem.NewBroker(20, logger.WithGroup("broker"))
// open database
dbPath := "file::memory:?cache=shared"
dbPath := "gotelem.db"
if cCtx.IsSet("db") {
dbPath = cCtx.Path("db")
}
logger.Info("opening database", "path", dbPath)
db, err := db.OpenTelemDb(dbPath)
db, err := gotelem.OpenTelemDb(dbPath)
if err != nil {
return err
}
@ -113,17 +115,17 @@ func serve(cCtx *cli.Context) error {
}
for _, svc := range serveThings {
logger.Info("starting service", "svc", svc.String())
logger.Info("starting service", "service", svc.String())
wg.Add(1)
go func(mySvc service, baseLogger *slog.Logger) {
svcLogger := logger.With("svc", mySvc.String())
svcLogger := logger.With("service", mySvc.String())
s := deps
s.Logger = svcLogger
defer wg.Done()
// TODO: recover
err := mySvc.Start(cCtx, s)
if err != nil {
logger.Error("service stopped!", "err", err, "svc", mySvc.String())
logger.Error("service stopped!", "err", err, "service", mySvc.String())
}
}(svc, logger)
}
@ -148,6 +150,7 @@ func (x *xBeeService) Status() {
func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
logger := deps.Logger
broker := deps.Broker
tdb := deps.Db
if cCtx.String("xbee") == "" {
logger.Info("not using xbee")
return
@ -175,8 +178,6 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
xbeeTxer := json.NewEncoder(x.session)
xbeeRxer := json.NewDecoder(x.session)
// xbeePackets := make(chan skylab.BusEvent)
// background task to read json packets off of the xbee and send them to the
go func() {
for {
var p skylab.BusEvent
@ -185,6 +186,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
logger.Error("failed to decode xbee packet")
}
broker.Publish("xbee", p)
tdb.AddEventsCtx(cCtx.Context, p)
}
}()
for {
@ -194,7 +196,7 @@ func (x *xBeeService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
return
case msg := <-rxCh:
logger.Info("got msg", "msg", msg)
xbeeTxer.Encode(msg)
err := xbeeTxer.Encode(msg)
if err != nil {
logger.Warn("error writing to xbee", "err", err)
}
@ -220,7 +222,7 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
broker := deps.Broker
db := deps.Db
r := api.TelemRouter(logger, broker, db)
r := gotelem.TelemRouter(logger, broker, db)
//
@ -240,33 +242,58 @@ func (h *httpService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
return
}
// dbWriterService listens to the CAN packet broker and saves packets to the database.
type dbWriterService struct {
type DemoService struct {
}
func (d *dbWriterService) Status() {
func (d *DemoService) String() string {
return "demo service"
}
func (d *dbWriterService) String() string {
return "db logger"
}
func (d *dbWriterService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
// put CAN packets from the broker into the database.
tdb := deps.Db
rxCh, err := deps.Broker.Subscribe("dbWriter")
defer deps.Broker.Unsubscribe("dbWriter")
// TODO: add buffering + timeout/backpressure
func (d *DemoService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
if !cCtx.Bool("demo") {
return
}
broker := deps.Broker
bmsPkt := &skylab.BmsMeasurement{
Current: 1.23,
BatteryVoltage: 11111,
AuxVoltage: 22222,
}
wslPkt := &skylab.WslVelocity{
MotorVelocity: 0,
VehicleVelocity: 100.0,
}
var next skylab.Packet = bmsPkt
for {
select {
case msg := <-rxCh:
tdb.AddEventsCtx(cCtx.Context, msg)
case <-cCtx.Done():
return
case <-time.After(100 * time.Millisecond):
// send the next packet.
if next == bmsPkt {
bmsPkt.Current = float32(math.Sin(float64(time.Now().UnixMilli()) / 2000.0))
ev := skylab.BusEvent{
Timestamp: time.Now(),
Name: next.String(),
Data: next,
}
broker.Publish("livestream", ev)
next = wslPkt
} else {
// send the wsl
ev := skylab.BusEvent{
Timestamp: time.Now(),
Name: next.String(),
Data: next,
}
broker.Publish("livestream", ev)
next = bmsPkt
}
}
}
}

View file

@ -3,6 +3,8 @@
package cli
import (
"errors"
"io"
"time"
"github.com/kschamplin/gotelem/internal/can"
@ -55,9 +57,10 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
logger := deps.Logger
broker := deps.Broker
tdb := deps.Db
if !cCtx.IsSet("can") {
logger.Info("no can device provided")
logger.Debug("no can device provided, skip")
return
}
@ -82,6 +85,9 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
go func() {
for {
pkt, err := s.sock.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
logger.Warn("error receiving CAN packet", "err", err)
}
@ -95,23 +101,29 @@ func (s *socketCANService) Start(cCtx *cli.Context, deps svcDeps) (err error) {
case msg := <-rxCh:
frame, err = skylab.ToCanFrame(msg.Data)
if err != nil {
logger.Warn("error encoding can frame", "name", msg.Name, "err", err)
continue
}
s.sock.Send(&frame)
case msg := <-rxCan:
p, err := skylab.FromCanFrame(msg)
if err != nil {
logger.Warn("error parsing can packet", "id", msg.Id)
logger.Warn("error parsing can packet", "id", msg.Id, "err", err)
continue
}
cde := skylab.BusEvent{
event := skylab.BusEvent{
Timestamp: time.Now(),
Name: p.String(),
Name: p.String(),
Data: p,
}
broker.Publish("socketCAN", cde)
broker.Publish("socketCAN", event)
tdb.AddEventsCtx(cCtx.Context, event)
case <-cCtx.Done():
// close the socket.
s.sock.Close()
return
}
}

View file

@ -92,7 +92,7 @@ func run(ctx *cli.Context) (err error) {
fileReader := bufio.NewReader(istream)
var pfun logparsers.BusParserFunc
var pfun logparsers.BusEventParser
pfun, ok := logparsers.ParsersMap[ctx.String("format")]
if !ok {

388
db.go Normal file
View file

@ -0,0 +1,388 @@
package gotelem
// this file implements the database functions to load/store/read from a sql database.
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/kschamplin/gotelem/skylab"
_ "github.com/mattn/go-sqlite3"
)
type TelemDb struct {
db *sqlx.DB
}
// this function is internal use. It actually opens the database, but uses
// a raw path string instead of formatting one like the exported functions.
func OpenRawDb(rawpath string) (tdb *TelemDb, err error) {
tdb = &TelemDb{}
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
if err != nil {
return
}
// perform any database migrations
version, err := tdb.GetVersion()
if err != nil {
return
}
// TODO: use logging instead of printf
fmt.Printf("starting version %d\n", version)
version, err = RunMigrations(tdb)
fmt.Printf("ending version %d\n", version)
return tdb, err
}
// this string is used to open the read-write db.
// the extra options improve performance significantly.
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
// OpenTelemDb opens a new telemetry database at the given path.
func OpenTelemDb(path string) (*TelemDb, error) {
dbStr := fmt.Sprintf(ProductionDbURI, path)
return OpenRawDb(dbStr)
}
func (tdb *TelemDb) GetVersion() (int, error) {
var version int
err := tdb.db.Get(&version, "PRAGMA user_version")
return version, err
}
func (tdb *TelemDb) SetVersion(version int) error {
stmt := fmt.Sprintf("PRAGMA user_version = %d", version)
_, err := tdb.db.Exec(stmt)
return err
}
// sql expression to insert a bus event into the packets database.1
const sqlInsertEvent = `INSERT INTO "bus_events" (ts, name, data) VALUES `
// AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
// edge case - zero events.
if len(events) == 0 {
return 0, nil
}
n = 0
tx, err := tdb.db.BeginTx(ctx, nil)
defer tx.Rollback()
if err != nil {
return
}
sqlStmt := sqlInsertEvent
const rowSql = "(?, ?, json(?))"
inserts := make([]string, len(events))
vals := []interface{}{}
idx := 0 // we have to manually increment, because sometimes we don't insert.
for _, b := range events {
inserts[idx] = rowSql
var j []byte
j, err = json.Marshal(b.Data)
if err != nil {
// we had some error turning the packet into json.
continue // we silently skip.
}
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
idx++
}
// construct the full statement now
sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
stmt, err := tx.PrepareContext(ctx, sqlStmt)
// defer stmt.Close()
if err != nil {
return
}
res, err := stmt.ExecContext(ctx, vals...)
if err != nil {
return
}
n, err = res.RowsAffected()
tx.Commit()
return
}
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...)
}
// LimitOffsetModifier is a modifier to support pagniation.
type LimitOffsetModifier struct {
Limit int
Offset int
}
func (l *LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
sb.WriteString(clause)
return nil
}
// BusEventFilter is a filter for bus events.
type BusEventFilter struct {
Names []string // The name(s) of packets to filter for
StartTime time.Time // Starting time range. All packets >= StartTime
EndTime time.Time // Ending time range. All packets <= EndTime
Indexes []int // The specific index of the packets to index.
}
// now we can optionally add a limit.
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, lim *LimitOffsetModifier) ([]skylab.BusEvent, error) {
// construct a simple
var whereFrags = make([]string, 0)
// if we're filtering by names, add a where clause for it.
if len(filter.Names) > 0 {
// we have to quote our individual names
names := strings.Join(filter.Names, `", "`)
qString := fmt.Sprintf(`name IN ("%s")`, names)
whereFrags = append(whereFrags, qString)
}
// TODO: identify if we need a special case for both time ranges
// using BETWEEN since apparenlty that can be better?
// next, check if we have a start/end time, add constraints
if !filter.EndTime.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if !filter.StartTime.IsZero() {
// we have an end range
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if len(filter.Indexes) > 0 {
s := make([]string, 0)
for _, idx := range filter.Indexes {
s = append(s, fmt.Sprint(idx))
}
idxs := strings.Join(s, ", ")
qString := fmt.Sprintf(`idx in (%s)`, idxs)
whereFrags = append(whereFrags, qString)
}
sb := strings.Builder{}
sb.WriteString(`SELECT ts, name, data from "bus_events"`)
// construct the full statement.
if len(whereFrags) > 0 {
// use the where clauses.
sb.WriteString(" WHERE ")
sb.WriteString(strings.Join(whereFrags, " AND "))
}
sb.WriteString(" ORDER BY ts DESC")
// Augment our data further if there's i.e a limit modifier.
// TODO: factor this out maybe?
if lim != nil {
lim.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String())
if err != nil {
return nil, err
}
defer rows.Close()
var events = make([]skylab.BusEvent, 0, 10)
for rows.Next() {
var ev skylab.RawJsonEvent
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
if err != nil {
return nil, err
}
BusEv := skylab.BusEvent{
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Name: ev.Name,
}
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
if err != nil {
return events, nil
}
events = append(events, BusEv)
}
err = rows.Err()
return events, err
}
// We now need a different use-case: we would like to extract a value from
// a specific packet.
// Datum is a single measurement - it is more granular than a packet.
// the classic example is bms_measurement.current
type Datum struct {
Timestamp time.Time `db:"timestamp" json:"ts"`
Value any `db:"val" json:"val"`
}
// GetValues queries the database for values in a given time range.
// A value is a specific data point. For example, bms_measurement.current
// would be a value.
func (tdb *TelemDb) GetValues(ctx context.Context, filter BusEventFilter,
field string, lim *LimitOffsetModifier) ([]Datum, error) {
// this fragment uses json_extract from sqlite to get a single
// nested value.
sb := strings.Builder{}
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
if len(filter.Names) != 1 {
return nil, errors.New("invalid number of names")
}
whereFrags := []string{"name is ?"}
if !filter.StartTime.IsZero() {
qString := fmt.Sprintf("ts >= %d", filter.StartTime.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if !filter.EndTime.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.EndTime.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if len(filter.Indexes) > 0 {
s := make([]string, 0)
for _, idx := range filter.Indexes {
s = append(s, fmt.Sprint(idx))
}
idxs := strings.Join(s, ", ")
qString := fmt.Sprintf(`idx in (%s)`, idxs)
whereFrags = append(whereFrags, qString)
}
// join qstrings with AND
sb.WriteString(strings.Join(whereFrags, " AND "))
sb.WriteString(" ORDER BY ts DESC")
if lim != nil {
lim.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, filter.Names[0])
if err != nil {
return nil, err
}
defer rows.Close()
data := make([]Datum, 0, 10)
for rows.Next() {
var d Datum = Datum{}
var ts int64
err = rows.Scan(&ts, &d.Value)
d.Timestamp = time.UnixMilli(ts)
if err != nil {
fmt.Print(err)
return data, err
}
data = append(data, d)
}
fmt.Print(rows.Err())
return data, nil
}
// AddDocument inserts a new document to the store if it is unique and valid.
func (tdb *TelemDb) AddDocument(ctx context.Context, obj json.RawMessage) error {
const insertStmt = `INSERT INTO openmct_objects (data) VALUES (json(?))`
_, err := tdb.db.ExecContext(ctx, insertStmt, obj)
return err
}
// DocumentNotFoundError is when the underlying document cannot be found.
type DocumentNotFoundError string
func (e DocumentNotFoundError) Error() string {
return fmt.Sprintf("document could not find key: %s", string(e))
}
// UpdateDocument replaces the entire contents of a document matching
// the given key. Note that the key is derived from the document,
// and no checks are done to ensure that the new key is the same.
func (tdb *TelemDb) UpdateDocument(ctx context.Context, key string,
obj json.RawMessage) error {
const upd = `UPDATE openmct_objects SET data = json(?) WHERE key IS ?`
r, err := tdb.db.ExecContext(ctx, upd, obj, key)
if err != nil {
return err
}
n, err := r.RowsAffected()
if err != nil {
return err
}
if n != 1 {
return DocumentNotFoundError(key)
}
return err
}
// GetDocument gets the document matching the corresponding key.
func (tdb *TelemDb) GetDocument(ctx context.Context, key string) (json.RawMessage, error) {
const get = `SELECT data FROM openmct_objects WHERE key IS ?`
row := tdb.db.QueryRowxContext(ctx, get, key)
var res []byte // VERY important, json.RawMessage won't work here
// since the scan function does not look at underlying types.
row.Scan(&res)
if len(res) == 0 {
return nil, DocumentNotFoundError(key)
}
return res, nil
}
// GetAllDocuments returns all documents in the database.
func (tdb *TelemDb) GetAllDocuments(ctx context.Context) ([]json.RawMessage, error) {
const getall = `SELECT data FROM openmct_objects`
rows, err := tdb.db.QueryxContext(ctx, getall)
if err != nil {
return nil, err
}
defer rows.Close()
docs := make([]json.RawMessage, 0)
for rows.Next() {
var j json.RawMessage
rows.Scan(&j)
docs = append(docs, j)
}
return docs, nil
}
// DeleteDocument removes a document from the store, or errors
// if it does not exist.
func (tdb *TelemDb) DeleteDocument(ctx context.Context, key string) error {
const del = `DELETE FROM openmct_objects WHERE key IS ?`
res, err := tdb.db.ExecContext(ctx, del, key)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n != 1 {
return DocumentNotFoundError(key)
}
return err
}

325
db_test.go Normal file
View file

@ -0,0 +1,325 @@
package gotelem
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"reflect"
"strings"
"testing"
"time"
"github.com/kschamplin/gotelem/internal/logparsers"
"github.com/kschamplin/gotelem/skylab"
)
// helper func to get a random bus event with random data.
func GetRandomBusEvent() skylab.BusEvent {
data := skylab.WsrVelocity{
MotorVelocity: 1.0,
VehicleVelocity: 4.0,
}
ev := skylab.BusEvent{
Timestamp: time.Now(),
Data: &data,
}
return ev
}
// exampleData is a telemetry log data snippet that
// we use to seed the database.
const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD
1698013005.168 1460000000000000000
1698013005.170 1470000000000000000
1698013005.172 1610000000000000000
1698013005.175 1210000000000000000
1698013005.177 157FFFFC74200000000
1698013005.181 1030000000000000000
1698013005.184 1430000000000000000
1698013005.187 04020D281405EA8FB41
1698013005.210 0413BDF81406AF70042
1698013005.212 042569F81408EF0FF41
1698013005.215 04358A8814041060242
1698013005.219 04481958140D2A40342
1698013005.221 0452DB2814042990442
1698013005.224 047AF948140C031FD41
1698013005.226 04B27A081401ACD0B42
1698013005.229 04DCEAA81403C8C0A42
1698013005.283 04E0378814024580142
1698013005.286 04F97908140BFBC0142
1698013005.289 050098A81402F0F0A42
1698013005.293 051E6AE81402AF20842
1698013005.297 0521AC081403A970742
1698013005.300 0535BB181403CEB0542
1698013005.304 054ECC0814088FE0142
1698013005.307 0554ED181401F44F341
1698013005.309 05726E48140D42BEB41
1698013005.312 059EFC98140EC400142
`
// MakeMockDatabase creates a new dummy database.
func MakeMockDatabase(name string) *TelemDb {
fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name)
tdb, err := OpenRawDb(fstring)
if err != nil {
panic(err)
}
return tdb
}
func SeedMockDatabase(tdb *TelemDb) {
// seed the database now.
scanner := bufio.NewScanner(strings.NewReader(exampleData))
for scanner.Scan() {
str := scanner.Text()
bev, err := logparsers.ParsersMap["telem"](str)
if err != nil {
panic(err)
}
_, err = tdb.AddEvents(bev)
if err != nil {
panic(err)
}
}
}
func GetSeedEvents() []skylab.BusEvent {
evs := make([]skylab.BusEvent, 0)
scanner := bufio.NewScanner(strings.NewReader(exampleData))
for scanner.Scan() {
str := scanner.Text()
bev, err := logparsers.ParsersMap["telem"](str)
if err != nil {
panic(err)
}
evs = append(evs, bev)
}
return evs
}
func TestTelemDb(t *testing.T) {
t.Run("test opening database", func(t *testing.T) {
// create our mock
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
})
t.Run("test inserting bus event", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
type args struct {
events []skylab.BusEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "add no packet",
args: args{
events: []skylab.BusEvent{},
},
wantErr: false,
},
{
name: "add single packet",
args: args{
events: []skylab.BusEvent{GetRandomBusEvent()},
},
wantErr: false,
},
{
name: "add multiple packet",
args: args{
events: []skylab.BusEvent{GetRandomBusEvent(), GetRandomBusEvent()},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if _, err := tdb.AddEvents(tt.args.events...); (err != nil) != tt.wantErr {
t.Errorf("TelemDb.AddEvents() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
})
t.Run("test getting packets", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
SeedMockDatabase(tdb)
ctx := context.Background()
f := BusEventFilter{}
limitMod := &LimitOffsetModifier{Limit: 1}
pkt, err := tdb.GetPackets(ctx, f, limitMod)
if err != nil {
t.Fatalf("error getting packets: %v", err)
}
if len(pkt) != 1 {
t.Fatalf("expected exactly one response, got %d", len(pkt))
}
// todo - validate what this should be.
})
t.Run("test read-write packet", func(t *testing.T) {
})
}
func MockDocument(key string) json.RawMessage {
var v = make(map[string]interface{})
v["identifier"] = map[string]string{"key": key}
v["randomdata"] = rand.Int()
res, err := json.Marshal(v)
if err != nil {
panic(err)
}
return res
}
func TestDbDocuments(t *testing.T) {
t.Run("test inserting a document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
err := tdb.AddDocument(ctx, MockDocument("hi"))
if err != nil {
t.Fatalf("AddDocument expected no error, got err=%v", err)
}
})
t.Run("test inserting duplicate documents", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
err := tdb.AddDocument(ctx, doc)
if err != nil {
t.Fatalf("AddDocument expected no error, got err=%v", err)
}
err = tdb.AddDocument(ctx, doc)
if err == nil {
t.Fatalf("AddDocument expected duplicate key error, got nil")
}
})
t.Run("test inserting bad document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
var badDoc = map[string]string{"bad": "duh"}
msg, err := json.Marshal(badDoc)
if err != nil {
panic(err)
}
err = tdb.AddDocument(ctx, msg)
if err == nil {
t.Fatalf("AddDocument expected error, got nil")
}
})
t.Run("test getting document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
err := tdb.AddDocument(ctx, doc)
if err != nil {
t.Fatalf("AddDocument expected no error, got err=%v", err)
}
res, err := tdb.GetDocument(ctx, "hi")
if err != nil {
t.Fatalf("GetDocument expected no error, got err=%v", err)
}
if !reflect.DeepEqual(res, doc) {
t.Fatalf("GetDocument did not return identical document")
}
})
t.Run("test getting nonexistent document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
res, err := tdb.GetDocument(ctx, "hi")
if err == nil || !errors.Is(err, DocumentNotFoundError("hi")) {
t.Fatalf("GetDocument expected DocumentNotFoundError, got %v", err)
}
if res != nil {
t.Fatalf("GetDocument expected nil result, got %v", res)
}
})
t.Run("test update document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc1 := MockDocument("hi")
doc2 := MockDocument("hi") // same key, we want to update.
tdb.AddDocument(ctx, doc1)
err := tdb.UpdateDocument(ctx, "hi", doc2)
if err != nil {
t.Fatalf("UpdateDocument expected no error, got err=%v", err)
}
// compare.
res, _ := tdb.GetDocument(ctx, "hi")
if !reflect.DeepEqual(res, doc2) {
t.Fatalf("UpdateDocument did not return new doc, got %s", res)
}
})
t.Run("test update nonexistent document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
err := tdb.UpdateDocument(ctx, "badKey", doc)
if err == nil {
t.Fatalf("UpdateDocument expected error, got nil")
}
})
t.Run("test delete document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
doc := MockDocument("hi")
tdb.AddDocument(ctx, doc)
err := tdb.DeleteDocument(ctx, "hi")
if err != nil {
t.Fatalf("DeleteDocument expected no error, got err=%v", err)
}
})
t.Run("test delete nonexistent document", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
ctx := context.Background()
err := tdb.DeleteDocument(ctx, "hi")
if !errors.Is(err, DocumentNotFoundError("hi")) {
t.Fatalf("DeleteDocument expected not found, got err=%v", err)
}
})
}

View file

@ -1,4 +1,4 @@
package api
package gotelem
// this file defines the HTTP handlers and routes.
@ -6,26 +6,95 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"time"
"log/slog"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/google/uuid"
"github.com/kschamplin/gotelem"
"github.com/kschamplin/gotelem/internal/db"
"github.com/kschamplin/gotelem/skylab"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)
func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.Handler {
func extractBusEventFilter(r *http.Request) (*BusEventFilter, error) {
bef := &BusEventFilter{}
v := r.URL.Query()
if v.Has("name") {
bef.Names = v["name"]
}
if el := v.Get("start"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.StartTime = t
}
if el := v.Get("end"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.EndTime = t
}
if v.Has("idx") {
bef.Indexes = make([]int, 0)
for _, strIdx := range v["idx"] {
idx, err := strconv.ParseInt(strIdx, 10, 32)
if err != nil {
return nil, err
}
bef.Indexes = append(bef.Indexes, int(idx))
}
}
return bef, nil
}
func extractLimitModifier(r *http.Request) (*LimitOffsetModifier, error) {
lim := &LimitOffsetModifier{}
v := r.URL.Query()
if el := v.Get("limit"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Limit = int(val)
// next, we check if we have an offset.
// we only check offset if we also have a limit.
// offset without limit isn't valid and is ignored.
if el := v.Get("offset"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Offset = int(val)
}
return lim, nil
}
// we use the nil case to indicate that no limit was provided.
return nil, nil
}
type RouterMod func(chi.Router)
var RouterMods = []RouterMod{}
func TelemRouter(log *slog.Logger, broker *Broker, db *TelemDb) http.Handler {
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger) // TODO: integrate with slog instead of go default logger.
r.Use(middleware.Recoverer)
r.Use(middleware.SetHeader("Access-Control-Allow-Origin", "*"))
// heartbeat request.
r.Get("/ping", func(w http.ResponseWriter, r *http.Request) {
@ -34,6 +103,9 @@ func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.
r.Mount("/api/v1", apiV1(broker, db))
for _, mod := range RouterMods {
mod(r)
}
// To future residents - you can add new API calls/systems in /api/v2
// Don't break anything in api v1! keep legacy code working!
@ -41,7 +113,7 @@ func TelemRouter(log *slog.Logger, broker *gotelem.Broker, db *db.TelemDb) http.
}
// define API version 1 routes.
func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
func apiV1(broker *Broker, tdb *TelemDb) chi.Router {
r := chi.NewRouter()
// this API only accepts JSON.
r.Use(middleware.AllowContentType("application/json"))
@ -56,15 +128,19 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
})
r.Route("/packets", func(r chi.Router) {
r.Get("/subscribe", apiV1PacketSubscribe(broker, tdb))
r.Get("/subscribe", apiV1PacketSubscribe(broker))
r.Post("/", func(w http.ResponseWriter, r *http.Request) {
var pkgs []skylab.BusEvent
var pkts []skylab.BusEvent
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&pkgs); err != nil {
if err := decoder.Decode(&pkts); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tdb.AddEvents(pkgs...)
conn_id := r.RemoteAddr + uuid.NewString()
for _, pkt := range pkts {
broker.Publish(conn_id, pkt)
}
tdb.AddEventsCtx(r.Context(), pkts...)
})
// general packet history get.
r.Get("/", apiV1GetPackets(tdb))
@ -74,15 +150,10 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
})
// records are driving segments/runs.
r.Route("/records", func(r chi.Router) {
r.Get("/", apiV1GetRecords(tdb)) // get all runs
r.Get("/active", apiV1GetActiveRecord(tdb)) // get current run (no end time)
r.Post("/", apiV1StartRecord(tdb)) // create a new run (with note). Ends active run if any, and creates new active run (no end time)
r.Get("/{id}", apiV1GetRecord(tdb)) // get details on a specific run
r.Put("/{id}", apiV1UpdateRecord(tdb)) // update a specific run. Can only be used to add notes/metadata, and not to change time/id.
// OpenMCT domain object storage. Basically an arbitrary JSON document store
r.Route("/openmct", apiV1OpenMCTStore(tdb))
})
// records are driving segments/runs.
r.Get("/stats", func(w http.ResponseWriter, r *http.Request) {
@ -91,9 +162,8 @@ func apiV1(broker *gotelem.Broker, tdb *db.TelemDb) chi.Router {
return r
}
// this is a websocket stream.
func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFunc {
func apiV1PacketSubscribe(broker *Broker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// pull filter from url query params.
bef, err := extractBusEventFilter(r)
@ -101,7 +171,7 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
http.Error(w, err.Error(), http.StatusInternalServerError)
}
// setup connection
conn_id := r.RemoteAddr + uuid.New().String()
conn_id := r.RemoteAddr + uuid.NewString()
sub, err := broker.Subscribe(conn_id)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -111,19 +181,18 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
defer broker.Unsubscribe(conn_id)
// setup websocket
c, err := websocket.Accept(w, r, nil)
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
c.Ping(r.Context())
// closeread handles protocol/status messages,
// also handles clients closing the connection.
// we get a context to use from it.
ctx := c.CloseRead(r.Context())
for {
select {
case <-ctx.Done():
@ -148,7 +217,7 @@ func apiV1PacketSubscribe(broker *gotelem.Broker, db *db.TelemDb) http.HandlerFu
}
}
func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
func apiV1GetPackets(tdb *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// this should use http query params to return a list of packets.
bef, err := extractBusEventFilter(r)
@ -163,21 +232,11 @@ func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
return
}
// TODO: is the following check needed?
var res []skylab.BusEvent
if lim != nil {
res, err = tdb.GetPackets(r.Context(), *bef, lim)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} else {
res, err = tdb.GetPackets(r.Context(), *bef)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
res, err = tdb.GetPackets(r.Context(), *bef, lim)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := json.Marshal(res)
if err != nil {
@ -192,7 +251,7 @@ func apiV1GetPackets(tdb *db.TelemDb) http.HandlerFunc {
// apiV1GetValues is a function that creates a handler for
// getting the specific value from a packet.
// this is useful for OpenMCT or other viewer APIs
func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
func apiV1GetValues(db *TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var err error
@ -214,7 +273,9 @@ func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
// override the bus event filter name option
bef.Names = []string{name}
res, err := db.GetValues(r.Context(), *bef, field, lim)
var res []Datum
// make the call, skip the limit modifier if it's nil.
res, err = db.GetValues(r.Context(), *bef, field, lim)
if err != nil {
// 500 server error:
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -230,27 +291,15 @@ func apiV1GetValues(db *db.TelemDb) http.HandlerFunc {
}
// TODO: rename. record is not a clear name. Runs? drives? segments?
func apiV1GetRecords(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
func apiV1OpenMCTStore(db *TelemDb) func(chi.Router) {
return func(r chi.Router) {
// key is a column on our json store, it's nested under identifier.key
r.Get("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Put("/{key}", func(w http.ResponseWriter, r *http.Request) {})
r.Delete("/{key}", func(w http.ResponseWriter, r *http.Request) {})
// create a new object.
r.Post("/", func(w http.ResponseWriter, r *http.Request) {})
// subscribe to object updates.
r.Get("/subscribe", func(w http.ResponseWriter, r *http.Request) {})
}
}
func apiV1GetActiveRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
}
}
func apiV1StartRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}
func apiV1GetRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}
func apiV1UpdateRecord(db *db.TelemDb) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {}
}

217
http_test.go Normal file
View file

@ -0,0 +1,217 @@
package gotelem
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"time"
"github.com/kschamplin/gotelem/skylab"
)
func Test_extractBusEventFilter(t *testing.T) {
makeReq := func(path string) *http.Request {
return httptest.NewRequest(http.MethodGet, path, nil)
}
tests := []struct {
name string
req *http.Request
want *BusEventFilter
wantErr bool
}{
{
name: "test no extractions",
req: makeReq("http://localhost/"),
want: &BusEventFilter{},
wantErr: false,
},
{
name: "test single name extract",
req: makeReq("http://localhost/?name=hi"),
want: &BusEventFilter{
Names: []string{"hi"},
},
wantErr: false,
},
{
name: "test multi name extract",
req: makeReq("http://localhost/?name=hi1&name=hi2"),
want: &BusEventFilter{
Names: []string{"hi1", "hi2"},
},
wantErr: false,
},
{
name: "test start time valid extract",
req: makeReq(fmt.Sprintf("http://localhost/?start=%s", url.QueryEscape(time.Unix(160000000, 0).Format(time.RFC3339)))),
want: &BusEventFilter{
StartTime: time.Unix(160000000, 0),
},
wantErr: false,
},
// {
// name: "test start time invalid extract",
// req: makeReq(fmt.Sprintf("http://localhost/?start=%s", url.QueryEscape("ajlaskdj"))),
// wantErr: true,
// },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Logf("Testing URL %s", tt.req.URL.String())
got, err := extractBusEventFilter(tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("extractBusEventFilter() error = %v, wantErr %v", err, tt.wantErr)
return
}
// we have to manually compare fields because timestamps can't be deeply compared.
if !reflect.DeepEqual(got.Names, tt.want.Names) {
t.Errorf("extractBusEventFilter() Names bad = %v, want %v", got.Names, tt.want.Names)
}
if !reflect.DeepEqual(got.Indexes, tt.want.Indexes) {
t.Errorf("extractBusEventFilter() Indexes bad = %v, want %v", got.Indexes, tt.want.Indexes)
}
if !got.StartTime.Equal(tt.want.StartTime) {
t.Errorf("extractBusEventFilter() StartTime mismatch = %v, want %v", got.StartTime, tt.want.StartTime)
}
if !got.EndTime.Equal(tt.want.EndTime) {
t.Errorf("extractBusEventFilter() EndTime mismatch = %v, want %v", got.EndTime, tt.want.EndTime)
}
})
}
}
func Test_extractLimitModifier(t *testing.T) {
makeReq := func(path string) *http.Request {
return httptest.NewRequest(http.MethodGet, path, nil)
}
tests := []struct {
name string
req *http.Request
want *LimitOffsetModifier
wantErr bool
}{
{
name: "test no limit/offset",
req: makeReq("http://localhost/"),
want: nil,
wantErr: false,
},
{
name: "test limit, no offset",
req: makeReq("http://localhost/?limit=10"),
want: &LimitOffsetModifier{Limit: 10},
wantErr: false,
},
{
name: "test limit and offset",
req: makeReq("http://localhost/?limit=100&offset=200"),
want: &LimitOffsetModifier{Limit: 100, Offset: 200},
wantErr: false,
},
{
name: "test only offset",
req: makeReq("http://localhost/?&offset=200"),
want: nil,
wantErr: false,
},
{
name: "test bad limit",
req: makeReq("http://localhost/?limit=aaaa"),
want: nil,
wantErr: true,
},
{
name: "test good limit, bad offset",
req: makeReq("http://localhost/?limit=10&offset=jjjj"),
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := extractLimitModifier(tt.req)
if (err != nil) != tt.wantErr {
t.Errorf("extractLimitModifier() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("extractLimitModifier() = %v, want %v", got, tt.want)
}
})
}
}
func Test_ApiV1GetPackets(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
SeedMockDatabase(tdb)
evs := GetSeedEvents()
handler := apiV1GetPackets(tdb)
tests := []struct{
name string
req *http.Request
statusCode int
expectedResults []skylab.BusEvent
}{
{
name: "get all packets test",
req: httptest.NewRequest(http.MethodGet, "http://localhost/", nil),
statusCode: http.StatusOK,
expectedResults: evs,
},
{
name: "filter name test",
req: httptest.NewRequest(http.MethodGet, "http://localhost/?name=bms_module", nil),
statusCode: http.StatusOK,
expectedResults: func() []skylab.BusEvent {
filtered := make([]skylab.BusEvent, 0)
for _, pkt := range evs {
if pkt.Name == "bms_module" {
filtered = append(filtered, pkt)
}
}
return filtered
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// construct the recorder
w := httptest.NewRecorder()
handler(w, tt.req)
resp := w.Result()
if tt.statusCode != resp.StatusCode {
t.Errorf("incorrect status code: expected %d got %d", tt.statusCode, resp.StatusCode)
}
decoder := json.NewDecoder(resp.Body)
var resultEvents []skylab.BusEvent
err := decoder.Decode(&resultEvents)
if err != nil {
t.Fatalf("could not parse JSON response: %v", err)
}
if len(resultEvents) != len(tt.expectedResults) {
t.Fatalf("response length did not match, want %d got %d", len(tt.expectedResults), len(resultEvents))
}
// Note, the results are flipped here. We return earliest first.
for idx := range tt.expectedResults {
expected := tt.expectedResults[idx]
actual := resultEvents[len(resultEvents) - 1 - idx]
if !expected.Equals(&actual) {
t.Errorf("packet did not match, want %v got %v", expected, actual)
}
}
})
}
}

View file

@ -1,59 +0,0 @@
package api
// This file contains common behaviors that are used across various requests
import (
"net/http"
"strconv"
"time"
"github.com/kschamplin/gotelem/internal/db"
)
func extractBusEventFilter(r *http.Request) (*db.BusEventFilter, error) {
bef := &db.BusEventFilter{}
v := r.URL.Query()
bef.Names = v["name"] // put all the names in.
if el := v.Get("start"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.TimerangeStart = t
}
if el := v.Get("end"); el != "" {
// parse the start time query.
t, err := time.Parse(time.RFC3339, el)
if err != nil {
return bef, err
}
bef.TimerangeStart = t
}
return bef, nil
}
func extractLimitModifier(r *http.Request) (*db.LimitOffsetModifier, error) {
lim := &db.LimitOffsetModifier{}
v := r.URL.Query()
if el := v.Get("limit"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Limit = int(val)
// next, we check if we have an offset.
// we only check offset if we also have a limit.
// offset without limit isn't valid and is ignored.
if el := v.Get("offset"); el != "" {
val, err := strconv.ParseInt(el, 10, 64)
if err != nil {
return nil, err
}
lim.Offset = int(val)
}
return lim, nil
}
// we use the nil case to indicate that no limit was provided.
return nil, nil
}

View file

@ -6,23 +6,22 @@
// by writing "adapters" to various devices/formats (xbee, socketcan)
package can
type CanID struct {
Id uint32
Id uint32
Extended bool // since the id itself is not enough.
}
// Frame represents a protocol-agnostic CAN frame. The Id can be standard or extended,
// but if it is extended, the Kind should be EFF.
type Frame struct {
Id CanID
Id CanID
Data []byte
Kind Kind
}
// TODO: should this be replaced
type CANFrame interface {
Id()
Id()
Data() []byte
Type() Kind
}
@ -34,8 +33,8 @@ type Kind uint8
const (
CanDataFrame Kind = iota // Standard ID Frame
CanRTRFrame // Remote Transmission Request Frame
CanErrFrame // Error Frame
CanRTRFrame // Remote Transmission Request Frame
CanErrFrame // Error Frame
)
// CanFilter is a basic filter for masking out data. It has an Inverted flag

View file

@ -1,152 +0,0 @@
package db
// this file implements the database functions to load/store/read from a sql database.
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/kschamplin/gotelem/skylab"
_ "github.com/mattn/go-sqlite3"
)
type TelemDb struct {
db *sqlx.DB
}
// TelemDbOption lets you customize the behavior of the sqlite database
type TelemDbOption func(*TelemDb) error
// this function is internal use. It actually opens the database, but uses
// a raw path string instead of formatting one like the exported functions.
func OpenRawDb(rawpath string, options ...TelemDbOption) (tdb *TelemDb, err error) {
tdb = &TelemDb{}
tdb.db, err = sqlx.Connect("sqlite3", rawpath)
if err != nil {
return
}
for _, fn := range options {
err = fn(tdb)
if err != nil {
return
}
}
// perform any database migrations
version, err := tdb.GetVersion()
if err != nil {
return
}
// TODO: use logging instead of printf
fmt.Printf("starting version %d\n", version)
version, err = RunMigrations(tdb)
fmt.Printf("ending version %d\n", version)
return tdb, err
}
// this string is used to open the read-write db.
// the extra options improve performance significantly.
const ProductionDbURI = "file:%s?_journal_mode=wal&mode=rwc&_txlock=immediate&_timeout=10000"
// OpenTelemDb opens a new telemetry database at the given path.
func OpenTelemDb(path string, options ...TelemDbOption) (*TelemDb, error) {
dbStr := fmt.Sprintf(ProductionDbURI, path)
return OpenRawDb(dbStr, options...)
}
func (tdb *TelemDb) GetVersion() (int, error) {
var version int
err := tdb.db.Get(&version, "PRAGMA user_version")
return version, err
}
func (tdb *TelemDb) SetVersion(version int) error {
stmt := fmt.Sprintf("PRAGMA user_version = %d", version)
_, err := tdb.db.Exec(stmt)
return err
}
// sql expression to insert a bus event into the packets database.1
const sqlInsertEvent =`INSERT INTO "bus_events" (ts, name, data) VALUES `
// AddEvent adds the bus event to the database.
func (tdb *TelemDb) AddEventsCtx(ctx context.Context, events ...skylab.BusEvent) (n int64, err error) {
// edge case - zero events.
if len(events) == 0 {
return 0, nil
}
n = 0
tx, err := tdb.db.BeginTx(ctx, nil)
defer tx.Rollback()
if err != nil {
return
}
sqlStmt := sqlInsertEvent
const rowSql = "(?, ?, json(?))"
inserts := make([]string, len(events))
vals := []interface{}{}
idx := 0 // we have to manually increment, because sometimes we don't insert.
for _, b := range events {
inserts[idx] = rowSql
var j []byte
j, err = json.Marshal(b.Data)
if err != nil {
// we had some error turning the packet into json.
continue // we silently skip.
}
vals = append(vals, b.Timestamp.UnixMilli(), b.Data.String(), j)
idx++
}
// construct the full statement now
sqlStmt = sqlStmt + strings.Join(inserts[:idx], ",")
stmt, err := tx.PrepareContext(ctx, sqlStmt)
// defer stmt.Close()
if err != nil {
return
}
res, err := stmt.ExecContext(ctx, vals...)
if err != nil {
return
}
n, err = res.RowsAffected()
tx.Commit()
return
}
func (tdb *TelemDb) AddEvents(events ...skylab.BusEvent) (int64, error) {
return tdb.AddEventsCtx(context.Background(), events...)
}
// GetActiveDrive finds the non-null drive and returns it, if any.
func (tdb *TelemDb) GetActiveDrive() (res int, err error) {
err = tdb.db.Get(&res, "SELECT id FROM drive_records WHERE end_time IS NULL LIMIT 1")
return
}
func (tdb *TelemDb) NewDrive(start time.Time, note string) {
}
func (tdb *TelemDb) EndDrive() {
}
func (tdb *TelemDb) UpdateDrive(id int, note string) {
}

View file

@ -1,156 +0,0 @@
package db
import (
"bufio"
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/kschamplin/gotelem/internal/logparsers"
"github.com/kschamplin/gotelem/skylab"
)
// helper func to get a random bus event with random data.
func GetRandomBusEvent() skylab.BusEvent {
data := skylab.WsrVelocity{
MotorVelocity: 1.0,
VehicleVelocity: 4.0,
}
ev := skylab.BusEvent{
Timestamp: time.Now(),
Data: &data,
}
return ev
}
// exampleData is a telemetry log data snippet that
// we use to seed the database.
const exampleData = `1698013005.164 1455ED8FDBDFF4FC3BD
1698013005.168 1460000000000000000
1698013005.170 1470000000000000000
1698013005.172 1610000000000000000
1698013005.175 1210000000000000000
1698013005.177 157FFFFC74200000000
1698013005.181 1030000000000000000
1698013005.184 1430000000000000000
1698013005.187 04020D281405EA8FB41
1698013005.210 0413BDF81406AF70042
1698013005.212 042569F81408EF0FF41
1698013005.215 04358A8814041060242
1698013005.219 04481958140D2A40342
1698013005.221 0452DB2814042990442
1698013005.224 047AF948140C031FD41
1698013005.226 04B27A081401ACD0B42
1698013005.229 04DCEAA81403C8C0A42
1698013005.283 04E0378814024580142
1698013005.286 04F97908140BFBC0142
1698013005.289 050098A81402F0F0A42
1698013005.293 051E6AE81402AF20842
1698013005.297 0521AC081403A970742
1698013005.300 0535BB181403CEB0542
1698013005.304 054ECC0814088FE0142
1698013005.307 0554ED181401F44F341
1698013005.309 05726E48140D42BEB41
1698013005.312 059EFC98140EC400142
`
// MakeMockDatabase creates a new dummy database.
func MakeMockDatabase(name string) *TelemDb {
fstring := fmt.Sprintf("file:%s?mode=memory&cache=shared", name)
tdb, err := OpenRawDb(fstring)
if err != nil {
panic(err)
}
// seed the database now.
scanner := bufio.NewScanner(strings.NewReader(exampleData))
for scanner.Scan() {
str := scanner.Text()
bev, err := logparsers.ParsersMap["telem"](str)
if err != nil {
panic(err)
}
_, err = tdb.AddEvents(bev)
if err != nil {
panic(err)
}
}
return tdb
}
func TestTelemDb(t *testing.T) {
t.Run("test opening database", func(t *testing.T) {
// create our mock
tdb := MakeMockDatabase(t.Name())
tdb.db.Ping()
})
t.Run("test inserting bus event", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
type args struct {
events []skylab.BusEvent
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "add no packet",
args: args{
events: []skylab.BusEvent{},
},
wantErr: false,
},
{
name: "add single packet",
args: args{
events: []skylab.BusEvent{GetRandomBusEvent()},
},
wantErr: false,
},
{
name: "add multiple packet",
args: args{
events: []skylab.BusEvent{GetRandomBusEvent(), GetRandomBusEvent()},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if _, err := tdb.AddEvents(tt.args.events...); (err != nil) != tt.wantErr {
t.Errorf("TelemDb.AddEvents() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
})
t.Run("test getting packets", func(t *testing.T) {
tdb := MakeMockDatabase(t.Name())
ctx := context.Background()
f := BusEventFilter{}
limitMod := LimitOffsetModifier{Limit: 1}
pkt, err := tdb.GetPackets(ctx, f, limitMod)
if err != nil {
t.Fatalf("error getting packets: %v", err)
}
if len(pkt) != 1 {
t.Fatalf("expected exactly one response, got %d", len(pkt))
}
// todo - validate what this should be.
})
t.Run("test read-write packet", func(t *testing.T) {
})
}

View file

@ -1,172 +0,0 @@
package db
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/kschamplin/gotelem/skylab"
)
// Modifier augments SQL strings.
type Modifier interface {
ModifyStatement(*strings.Builder) error
}
// LimitOffsetModifier is a modifier to support pagniation.
type LimitOffsetModifier struct {
Limit int
Offset int
}
func (l LimitOffsetModifier) ModifyStatement(sb *strings.Builder) error {
clause := fmt.Sprintf(" LIMIT %d OFFSET %d", l.Limit, l.Offset)
sb.WriteString(clause)
return nil
}
// BusEventFilter is a filter for bus events.
type BusEventFilter struct {
Names []string
TimerangeStart time.Time
TimerangeEnd time.Time
}
// now we can optionally add a limit.
func (tdb *TelemDb) GetPackets(ctx context.Context, filter BusEventFilter, options ...Modifier) ([]skylab.BusEvent, error) {
// construct a simple
var whereFrags = make([]string, 0)
// if we're filtering by names, add a where clause for it.
if len(filter.Names) > 0 {
names := strings.Join(filter.Names, ", ")
qString := fmt.Sprintf("name IN (%s)", names)
whereFrags = append(whereFrags, qString)
}
// TODO: identify if we need a special case for both time ranges
// using BETWEEN since apparenlty that can be better?
// next, check if we have a start/end time, add constraints
if !filter.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", filter.TimerangeEnd.UnixMilli())
whereFrags = append(whereFrags, qString)
}
if !filter.TimerangeStart.IsZero() {
// we have an end range
qString := fmt.Sprintf("ts >= %d", filter.TimerangeStart.UnixMilli())
whereFrags = append(whereFrags, qString)
}
sb := strings.Builder{}
sb.WriteString("SELECT * from \"bus_events\"")
// construct the full statement.
if len(whereFrags) > 0 {
// use the where clauses.
sb.WriteString(" WHERE ")
sb.WriteString(strings.Join(whereFrags, " AND "))
}
// Augment our data further if there's i.e a limit modifier.
// TODO: factor this out maybe?
for _, m := range options {
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String())
if err != nil {
return nil, err
}
defer rows.Close()
var events = make([]skylab.BusEvent, 0, 10)
for rows.Next() {
var ev skylab.RawJsonEvent
err := rows.Scan(&ev.Timestamp, &ev.Name, (*[]byte)(&ev.Data))
if err != nil {
return nil, err
}
BusEv := skylab.BusEvent{
Timestamp: time.UnixMilli(int64(ev.Timestamp)),
Name: ev.Name,
}
BusEv.Data, err = skylab.FromJson(ev.Name, ev.Data)
if err != nil {
return events, nil
}
events = append(events, BusEv)
}
err = rows.Err()
return events, err
}
// We now need a different use-case: we would like to extract a value from
// a specific packet.
// Datum is a single measurement - it is more granular than a packet.
// the classic example is bms_measurement.current
type Datum struct {
Timestamp time.Time `db:"timestamp"`
Value any `db:"val"`
}
// GetValues queries the database for values in a given time range.
// A value is a specific data point. For example, bms_measurement.current
// would be a value.
func (tdb *TelemDb) GetValues(ctx context.Context, bef BusEventFilter,
field string, opts ...Modifier) ([]Datum, error) {
// this fragment uses json_extract from sqlite to get a single
// nested value.
sb := strings.Builder{}
sb.WriteString(`SELECT ts as timestamp, json_extract(data, '$.' || ?) as val FROM bus_events WHERE `)
if len(bef.Names) != 1 {
return nil, errors.New("invalid number of names")
}
qStrings := []string{"name is ?"}
// add timestamp limit.
if !bef.TimerangeStart.IsZero() {
qString := fmt.Sprintf("ts >= %d", bef.TimerangeStart.UnixMilli())
qStrings = append(qStrings, qString)
}
if !bef.TimerangeEnd.IsZero() {
qString := fmt.Sprintf("ts <= %d", bef.TimerangeEnd.UnixMilli())
qStrings = append(qStrings, qString)
}
// join qstrings with AND
sb.WriteString(strings.Join(qStrings, " AND "))
for _, m := range opts {
if m == nil {
continue
}
m.ModifyStatement(&sb)
}
rows, err := tdb.db.QueryxContext(ctx, sb.String(), field, bef.Names[0])
if err != nil {
return nil, err
}
defer rows.Close()
data := make([]Datum, 0, 10)
for rows.Next() {
var d Datum = Datum{}
var ts int64
err = rows.Scan(&ts, &d.Value)
d.Timestamp = time.UnixMilli(ts)
if err != nil {
fmt.Print(err)
return data, err
}
data = append(data, d)
}
fmt.Print(rows.Err())
return data, nil
}

View file

@ -1,43 +0,0 @@
package db
// This file implements Packet modelling, which allows us to look up fields by name
type PacketDef struct {
Name string
Description string
Id int
}
type FieldDef struct {
Name string
SubName string
Packet string
Type string
}
// PacketNotFoundError is when a matching packet cannot be found.
type PacketNotFoundError string
func (e *PacketNotFoundError) Error() string {
return "packet not found: " + string(*e)
}
// GetPacketDefN retrieves a packet matching the given name, if it exists.
// returns PacketNotFoundError if a matching packet could not be found.
func (tdb *TelemDb) GetPacketDefN(name string) (*PacketDef, error) {
return nil, nil
}
// GetPacketDefF retrieves the parent packet for a given field.
// This function cannot return PacketNotFoundError since we have SQL FKs enforcing.
func (tdb *TelemDb) GetPacketDefF(field FieldDef) (*PacketDef, error) {
return nil, nil
}
// GetFieldDefs returns the given fields for a given packet definition.
func (tdb *TelemDb) GetFieldDefs(pkt PacketDef) ([]FieldDef, error) {
return nil, nil
}

View file

@ -2,6 +2,7 @@ package logparsers
import (
"encoding/hex"
"encoding/json"
"fmt"
"regexp"
"strconv"
@ -35,16 +36,16 @@ func NewFormatError(msg string, err error) error {
return &FormatError{msg: msg, err: err}
}
// type LineParserFunc is a function that takes a string
// type CanFrameParser is a function that takes a string
// and returns a can frame. This is useful for common
// can dump formats.
type LineParserFunc func(string) (can.Frame, time.Time, error)
type CanFrameParser func(string) (can.Frame, time.Time, error)
var candumpRegex = regexp.MustCompile(`^\((\d+)\.(\d{6})\) \w+ (\w+)#(\w+)$`)
func parseCanDumpLine(dumpLine string) (frame can.Frame, ts time.Time, err error) {
frame = can.Frame{}
ts = time.Unix(0,0)
ts = time.Unix(0, 0)
// dumpline looks like this:
// (1684538768.521889) can0 200#8D643546
// remove trailing newline/whitespaces
@ -84,13 +85,13 @@ func parseCanDumpLine(dumpLine string) (frame can.Frame, ts time.Time, err error
}
// TODO: add extended id support, need an example log and a test.
frame.Id = can.CanID{Id: uint32(id), Extended: false}
frame.Id = can.CanID{Id: uint32(id), Extended: false}
frame.Data = rawData
frame.Kind = can.CanDataFrame
ts = time.Unix(unixSeconds, unixMicros * int64(time.Microsecond))
ts = time.Unix(unixSeconds, unixMicros*int64(time.Microsecond))
return
return
}
@ -103,7 +104,7 @@ var telemRegex = regexp.MustCompile(`^(\d+)\.(\d{3}) (\w{3})(\w+)$`)
func parseTelemLogLine(line string) (frame can.Frame, ts time.Time, err error) {
frame = can.Frame{}
ts = time.Unix(0,0)
ts = time.Unix(0, 0)
// strip trailng newline since we rely on it being gone
line = strings.TrimSpace(line)
@ -157,20 +158,27 @@ func parseTelemLogLine(line string) (frame can.Frame, ts time.Time, err error) {
}
// BusParserFunc is a function that takes a string and returns a busevent.
type BusParserFunc func(string) (skylab.BusEvent, error)
// BusEventParser is a function that takes a string and returns a busevent.
type BusEventParser func(string) (skylab.BusEvent, error)
// parserBusEventMapper takes a line parser (that returns a can frame)
// skylabify JSON parser.
func parseSkylabifyLogLine(input string) (skylab.BusEvent, error) {
var b = skylab.BusEvent{}
err := json.Unmarshal([]byte(input), &b)
return b, err
}
// frameParseToBusEvent takes a line parser (that returns a can frame)
// and makes it return a busEvent instead.
func parserBusEventMapper(f LineParserFunc) BusParserFunc {
func frameParseToBusEvent(fun CanFrameParser) BusEventParser {
return func(s string) (skylab.BusEvent, error) {
var b = skylab.BusEvent{}
f, ts, err := f(s)
frame, ts, err := fun(s)
if err != nil {
return b, err
}
b.Timestamp = ts
b.Data, err = skylab.FromCanFrame(f)
b.Data, err = skylab.FromCanFrame(frame)
if err != nil {
return b, err
}
@ -179,7 +187,8 @@ func parserBusEventMapper(f LineParserFunc) BusParserFunc {
}
}
var ParsersMap = map[string]BusParserFunc{
"telem": parserBusEventMapper(parseTelemLogLine),
"candump": parserBusEventMapper(parseCanDumpLine),
var ParsersMap = map[string]BusEventParser{
"telem": frameParseToBusEvent(parseTelemLogLine),
"candump": frameParseToBusEvent(parseCanDumpLine),
"json": parseSkylabifyLogLine,
}

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/kschamplin/gotelem/internal/can"
"github.com/kschamplin/gotelem/skylab"
)
func Test_parseCanDumpLine(t *testing.T) {
@ -159,7 +160,6 @@ func Test_parseTelemLogLine_errors(t *testing.T) {
name: "utf8 corruption",
input: "1698180835.318 0619\xed\xa0\x80fsadfD805640X0EBE24",
},
}
for _, tt := range tests {
@ -171,3 +171,42 @@ func Test_parseTelemLogLine_errors(t *testing.T) {
})
}
}
func Test_parseSkylabifyLogLine(t *testing.T) {
type args struct {
input string
}
tests := []struct {
name string
args args
want skylab.BusEvent
wantErr bool
}{
{
name: "basic test",
args: args{
input: `{"ts":1685141873612,"id":259,"name":"wsl_velocity","data":{"motor_velocity":89.97547,"vehicle_velocity":2.38853}}`},
want: skylab.BusEvent{
Timestamp: time.UnixMilli(1685141873612),
Name: "wsl_velocity",
Data: &skylab.WslVelocity{
MotorVelocity: 89.97547,
VehicleVelocity: 2.38853,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseSkylabifyLogLine(tt.args.input)
if (err != nil) != tt.wantErr {
t.Errorf("parseSkylabifyLogLine() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseSkylabifyLogLine() = %v, want %v", got, tt.want)
}
})
}
}

View file

@ -1,4 +1,4 @@
package db
package gotelem
import (
"embed"

View file

@ -1,4 +1,4 @@
package db
package gotelem
import (
"embed"

View file

@ -1,2 +1 @@
DROP TABLE "drive_records";
DROP TABLE "position_logs";

View file

@ -7,12 +7,3 @@ CREATE TABLE "drive_records" (
PRIMARY KEY("id" AUTOINCREMENT),
CONSTRAINT "duration_valid" CHECK(end_time is null or start_time < end_time)
);
CREATE TABLE "position_logs" (
"ts" INTEGER NOT NULL,
"source" TEXT NOT NULL,
"lat" REAL NOT NULL,
"lon" REAL NOT NULL,
"elevation" REAL,
CONSTRAINT "no_empty_source" CHECK(source is not "")
);

View file

@ -0,0 +1 @@
ALTER TABLE "bus_events" DROP COLUMN idx;

View file

@ -0,0 +1 @@
ALTER TABLE "bus_events" ADD COLUMN idx GENERATED ALWAYS AS (json_extract(data, '$.idx')) VIRTUAL;

View file

@ -0,0 +1,2 @@
DROP TABLE openmct_objects;
DROP INDEX openmct_key;

View file

@ -0,0 +1,6 @@
CREATE TABLE openmct_objects (
data TEXT,
key TEXT GENERATED ALWAYS AS (json_extract(data, '$.identifier.key')) VIRTUAL UNIQUE NOT NULL
);
-- fast key-lookup
CREATE INDEX openmct_key on openmct_objects(key);

30
openmct.go Normal file
View file

@ -0,0 +1,30 @@
//go:build openmct
package gotelem
import (
"embed"
"io/fs"
"net/http"
"github.com/go-chi/chi/v5"
)
// this package provides a web router for the statif openmct build.
// it should only be included if the build has been run,
// to do so, run npm install and then npm run build.
//go:embed web/dist
var public embed.FS
func OpenMCTRouter(r chi.Router) {
// strip the subdirectory
pfs, _ := fs.Sub(public, "web/dist")
// default route.
r.Handle("/*", http.FileServerFS(pfs))
}
func init() {
RouterMods = append(RouterMods, OpenMCTRouter)
}

View file

@ -68,3 +68,39 @@ Certain features, like socketCAN support, are only enabled on platforms that sup
This is handled automatically; builds will exclude the socketCAN files and
the additional commands and features will not be present in the CLI.
### Lightweight Build
This doesn't include the OpenMCT files, but is simpler to build, and doesn't require Node setup.
You must install Go.
```
$ go build ./cmd/gotelem
```
### Full Build
This includes an integrated OpenMCT build, which automatically connects to the Telemetry server
for historical and live data. You must have both Go and Node.JS installed.
```
$ cd web/
$ npm install
$ npm run build
$ cd ..
$ go build -tags openmct ./cmd/gotelem
```
## Development
During development, it can be useful to have the OpenMCT sources be served separately from Gotelem,
so you don't need to rebuild everything. This case is supported:
```
$ go run ./cmd/gotelem server --db gotelem.db # in one terminal
$ npm run serve # in a separate terminal
```
When using the dev server, webpack will set the Gotelem URL to `localhost:8080`. If you're running
Gotelem using the default settings, this should work out of the box. Making changes to the OpenMCT
plugins will trigger a refresh automatically.

View file

@ -18,37 +18,37 @@ import (
// SkylabFile is a yaml file from skylab.
type SkylabFile struct {
Packets []PacketDef `yaml:"packets"`
Boards []BoardDef `yaml:"boards"`
Packets []PacketDef `yaml:"packets,omitempty" json:"packets,omitempty"`
Boards []BoardDef `yaml:"boards,omitempty" json:"boards,omitempty"`
}
type BoardDef struct {
Name string `yaml:"name"`
Transmit []string `yaml:"transmit"`
Receive []string `yaml:"receive"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Transmit []string `yaml:"transmit,omitempty" json:"transmit,omitempty"`
Receive []string `yaml:"receive,omitempty" json:"receive,omitempty"`
}
// data field.
type FieldDef struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Units string `yaml:"units"`
Conversion float32 `yaml:"conversion"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Type string `yaml:"type,omitempty" json:"type,omitempty"`
Units string `yaml:"units,omitempty" json:"units,omitempty"`
Conversion float32 `yaml:"conversion,omitempty" json:"conversion,omitempty"`
Bits []struct {
Name string `yaml:"name"`
} `yaml:"bits"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
} `yaml:"bits,omitempty" json:"bits,omitempty"`
}
// a PacketDef is a full can packet.
type PacketDef struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
Id uint32 `yaml:"id"`
Endian string `yaml:"endian"`
Extended bool `yaml:"is_extended"`
Repeat int `yaml:"repeat"`
Offset int `yaml:"offset"`
Data []FieldDef `yaml:"data"`
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Description string `yaml:"description,omitempty" json:"description,omitempty"`
Id uint32 `yaml:"id,omitempty" json:"id,omitempty"`
Endian string `yaml:"endian,omitempty" json:"endian,omitempty"`
IsExtended bool `yaml:"is_extended,omitempty" json:"is_extended,omitempty"`
Repeat int `yaml:"repeat,omitempty" json:"repeat,omitempty"`
Offset int `yaml:"offset,omitempty" json:"offset,omitempty"`
Data []FieldDef `yaml:"data,omitempty" json:"data,omitempty"`
}
// we need to generate bitfield types.
@ -278,13 +278,13 @@ func idToString(p PacketDef) string {
if p.Repeat > 0 {
resp := make([]string, p.Repeat)
for idx := 0; idx < p.Repeat; idx++ {
resp[idx] = fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", int(p.Id)+idx*p.Offset, p.Extended)
resp[idx] = fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", int(p.Id)+idx*p.Offset, p.IsExtended)
}
return strings.Join(resp, ",")
} else {
return fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", p.Id, p.Extended)
return fmt.Sprintf("can.CanID{ Id: 0x%X, Extended: %t }", p.Id, p.IsExtended)
}
}

View file

@ -3,6 +3,7 @@
package skylab
import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
@ -74,7 +75,6 @@ type Sizer interface {
// CanSend takes a packet and makes CAN framing data.
func ToCanFrame(p Packet) (f can.Frame, err error) {
f.Id, err = p.CanId()
if err != nil {
return
@ -95,9 +95,9 @@ type RawJsonEvent struct {
// BusEvent is a timestamped Skylab packet - it contains
type BusEvent struct {
Timestamp time.Time `json:"ts"`
Name string `json:"id"`
Data Packet `json:"data"`
Timestamp time.Time
Name string
Data Packet
}
func (e BusEvent) MarshalJSON() (b []byte, err error) {
@ -117,6 +117,9 @@ func (e BusEvent) MarshalJSON() (b []byte, err error) {
}
// UnmarshalJSON implements JSON unmarshalling. Note that this
// uses RawJSON events, which are formatted differently.
// also it uses int64 milliseconds instead of times.
func (e *BusEvent) UnmarshalJSON(b []byte) error {
j := &RawJsonEvent{}
@ -133,6 +136,19 @@ func (e *BusEvent) UnmarshalJSON(b []byte) error {
return err
}
// Equals compares two bus events deeply.
func (e *BusEvent) Equals(other *BusEvent) bool {
if e.Name != other.Name {
return false
}
if !e.Timestamp.Equal(other.Timestamp) {
return false
}
pkt1, _ := e.Data.MarshalPacket()
pkt2, _ := e.Data.MarshalPacket()
return bytes.Equal(pkt1, pkt2)
}
// we need to be able to parse the JSON as well. this is done using the
// generator since we can use the switch/case thing since it's the fastest
@ -146,8 +162,9 @@ func (e *UnknownIdError) Error() string {
type BadLengthError struct {
expected uint32
actual uint32
actual uint32
}
func (e *BadLengthError) Error() string {
return fmt.Sprintf("bad data length, expected %d, got %d", e.expected, e.actual)
}

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

View file

@ -46,7 +46,7 @@ type {{$structName}} struct {
}
func (p *{{$structName}}) CanId() (can.CanID, error) {
c := can.CanID{Extended: {{.Extended}}}
c := can.CanID{Extended: {{.IsExtended}}}
{{- if .Repeat }}
if p.Idx >= {{.Repeat}} {
return c, &UnknownIdError{ {{ printf "0x%X" .Id }} }
@ -108,10 +108,10 @@ var idMap = map[can.CanID]bool{
{{ range $p := .Packets -}}
{{ if $p.Repeat }}
{{ range $idx := Nx (int $p.Id) $p.Repeat $p.Offset -}}
{ Id: {{ $idx | printf "0x%X"}}, Extended: {{$p.Extended}} }: true,
{ Id: {{ $idx | printf "0x%X"}}, Extended: {{$p.IsExtended}} }: true,
{{ end }}
{{- else }}
{ Id: {{ $p.Id | printf "0x%X" }}, Extended: {{$p.Extended}} }: true,
{ Id: {{ $p.Id | printf "0x%X" }}, Extended: {{$p.IsExtended}} }: true,
{{- end}}
{{- end}}
}

View file

@ -3,6 +3,7 @@ package skylab
import (
"testing"
"reflect"
"encoding/json"
)
@ -44,4 +45,20 @@ func TestJSON{{$structName}}(t *testing.T) {
}
func TestCanFrame{{$structName}}(t *testing.T) {
v := &{{$structName}}{}
frame, err := ToCanFrame(v)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
retpkt, err := FromCanFrame(frame)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if !reflect.DeepEqual(v, retpkt) {
t.Fatalf("decoded packet did not match sent %v got %v", v, retpkt)
}
}
{{- end }}

View file

@ -12,5 +12,3 @@ import (
// this program demonstrates basic CAN stuff.
// i give up this shit is so hard

View file

@ -134,17 +134,18 @@ func (sck *CanSocket) Send(msg *can.Frame) error {
idToWrite := msg.Id.Id
if (msg.Id.Extended) {
if msg.Id.Extended {
idToWrite &= unix.CAN_EFF_MASK
idToWrite |= unix.CAN_EFF_FLAG
}
switch msg.Kind {
case can.CanRTRFrame:
idToWrite |= unix.CAN_RTR_FLAG
case can.CanErrFrame:
return errors.New("you can't send error frames")
case can.CanDataFrame:
default:
return errors.New("unknown frame type")
}
@ -191,10 +192,10 @@ func (sck *CanSocket) Recv() (*can.Frame, error) {
id.Id = raw_id
if raw_id&unix.CAN_EFF_FLAG != 0 {
// extended id frame
id.Extended = true;
id.Extended = true
} else {
// it's a normal can frame
id.Extended = false;
id.Extended = false
}
var k can.Kind = can.CanDataFrame
@ -203,8 +204,8 @@ func (sck *CanSocket) Recv() (*can.Frame, error) {
// we got an error...
k = can.CanErrFrame
}
if raw_id & unix.CAN_RTR_FLAG != 0 {
if raw_id&unix.CAN_RTR_FLAG != 0 {
k = can.CanRTRFrame
}

View file

@ -29,7 +29,7 @@ func TestCanSocket(t *testing.T) {
}
})
t.Run("test name", func(t *testing.T) {
t.Run("test interface name", func(t *testing.T) {
sock, _ := NewCanSocket("vcan0")
defer sock.Close()
@ -51,7 +51,7 @@ func TestCanSocket(t *testing.T) {
err := sock.Send(testFrame)
if err != nil {
t.Error(err)
t.Fatal(err)
}
})

131
web/.gitignore vendored Normal file
View file

@ -0,0 +1,131 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Microbundle cache
.rpt2_cache/
.rts2_cache_cjs/
.rts2_cache_es/
.rts2_cache_umd/
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
dist
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
.cache
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v2
.yarn/cache
.yarn/unplugged
.yarn/build-state.yml
.yarn/install-state.gz
.pnp.*

10
web/eslint.config.js Normal file
View file

@ -0,0 +1,10 @@
import eslint from '@eslint/js';
import tseslint from 'typescript-eslint';
export default tseslint.config(
eslint.configs.recommended,
...tseslint.configs.recommended,
{
"ignores": ["dist/*"]
}
);

5920
web/package-lock.json generated Normal file

File diff suppressed because it is too large Load diff

30
web/package.json Normal file
View file

@ -0,0 +1,30 @@
{
"name": "g1_openmct",
"version": "1.0.0",
"description": "dev environment for openmct plugins for g1 strategy tool",
"type": "module",
"scripts": {
"build": "webpack --config webpack.prod.js",
"serve": "webpack serve --config webpack.dev.js"
},
"author": "",
"license": "ISC",
"dependencies": {
"openmct": "^3.2.1"
},
"devDependencies": {
"@types/node": "^20.11.25",
"@types/webpack": "^5.28.5",
"copy-webpack-plugin": "^12.0.2",
"eslint": "^8.57.0",
"html-webpack-plugin": "^5.6.0",
"ts-loader": "^9.5.1",
"ts-node": "^10.9.2",
"typescript": "^5.4.2",
"typescript-eslint": "^7.1.1",
"webpack": "^5.90.3",
"webpack-cli": "^5.1.4",
"webpack-dev-server": "^5.0.2",
"webpack-merge": "^5.10.0"
}
}

332
web/src/app.ts Normal file
View file

@ -0,0 +1,332 @@
import openmct from "openmct";
//@ts-expect-error openmct
openmct.setAssetPath('openmct');
//@ts-expect-error openmct
openmct.install(openmct.plugins.LocalStorage());
//@ts-expect-error openmct
openmct.install(openmct.plugins.MyItems());
//@ts-expect-error openmct
openmct.install(openmct.plugins.Timeline());
//@ts-expect-error openmct
openmct.install(openmct.plugins.UTCTimeSystem());
//@ts-expect-error openmct
openmct.install(openmct.plugins.Clock({ enableClockIndicator: true }));
//@ts-expect-error openmct
openmct.install(openmct.plugins.Timer());
//@ts-expect-error openmct
openmct.install(openmct.plugins.Timelist());
//@ts-expect-error openmct
openmct.install(openmct.plugins.Hyperlink())
//@ts-expect-error openmct
openmct.install(openmct.plugins.Notebook())
//@ts-expect-error openmct
openmct.install(openmct.plugins.BarChart())
//@ts-expect-error openmct
openmct.install(openmct.plugins.ScatterPlot())
//@ts-expect-error openmct
openmct.install(openmct.plugins.SummaryWidget())
//@ts-expect-error openmct
openmct.install(openmct.plugins.LADTable());
openmct.time.clock('local', { start: -5 * 60 * 1000, end: 0 });
//@ts-expect-error openmct
openmct.time.timeSystem('utc');
//@ts-expect-error openmct
openmct.install(openmct.plugins.Espresso());
openmct.install(
//@ts-expect-error openmct
openmct.plugins.Conductor({
menuOptions: [
{
name: 'Fixed',
timeSystem: 'utc',
bounds: {
start: Date.now() - 30000000,
end: Date.now()
},
},
{
name: 'Realtime',
timeSystem: 'utc',
clock: 'local',
clockOffsets: {
start: -30000000,
end: 30000
},
}
]
})
);
if (process.env.BASE_URL) {
console.log("got a thing")
console.log(process.env.BASE_URL)
}
interface SkylabField {
name: string
type: string
units?: string
conversion?: number
bits?: {
name: string
}
}
interface SkylabPacket {
name: string
description?: string
id: number
endian?: string
is_extended: boolean
repeat?: number
offset?: number
data: [SkylabField]
}
interface SkylabBoard {
name: string
transmit: [string]
receive: [string]
}
interface SkylabSchema {
packets: [SkylabPacket]
boards: [SkylabBoard]
}
let schemaCached = null;
function getSchema(): Promise<SkylabSchema> {
if (schemaCached === null) {
return fetch(`${process.env.BASE_URL}/api/v1/schema`).then((resp) => {
const res = resp.json()
console.log("got schema, caching", res);
schemaCached = res
return res
})
}
return Promise.resolve(schemaCached)
}
const objectProvider = {
get: function (id) {
return getSchema().then((schema) => {
if (id.key === "car") {
const comp = schema.packets.map((x) => {
return {
key: x.name,
namespace: "umnsvp"
}
})
return {
identifier: id,
name: "the solar car",
type: 'folder',
location: 'ROOT',
composition: comp
}
}
const pkt = schema.packets.find((x) => x.name === id.key)
if (pkt) {
// if the key matches one of the packet names,
// we know it's a packet.
// construct a list of fields for this packet.
const comp = pkt.data.map((field) => {
if (field.type === "bitfield") {
//
}
return {
// we have to do this since
// we can't get the packet name otherwise.
key: `${pkt.name}.${field.name}`,
namespace: "umnsvp"
}
})
return {
identifier: id,
name: pkt.name,
type: 'folder',
composition: comp
}
}
// at this point it's definitely a field aka umnsvp-datum
const [pktName, fieldName] = id.key.split('.')
return {
identifier: id,
name: fieldName,
type: 'umnsvp-datum',
conversion: schema.packets.find((x) => x.name === pktName).data.find((f) => f.name === fieldName).conversion,
telemetry: {
values: [
{
key: "value",
source: "val",
name: "Value",
"format": "float",
hints: {
range: 1
}
},
{
key: "utc",
source: "ts",
name: "Timestamp",
format: "utc",
hints: {
domain: 1
}
}
]
}
}
})
}
}
interface Datum {
ts: number
val: number
}
const TelemHistoryProvider = {
supportsRequest: function (dObj) {
return dObj.type === 'umnsvp-datum'
},
request: function (dObj, opt) {
const [pktName, fieldName] = dObj.identifier.key.split('.')
const url = `${process.env.BASE_URL}/api/v1/packets/${pktName}/${fieldName}?`
const params = new URLSearchParams({
start: new Date(opt.start).toISOString(),
end: new Date(opt.end).toISOString(),
})
console.log((opt.end - opt.start) / opt.size)
return fetch(url + params).then((resp) => {
resp.json().then((result: [Datum]) => {
if (dObj.conversion && dObj.conversion != 0) {
// apply conversion
result.map((dat) => {
dat.val = dat.val * dObj.conversion
return dat
})
}
return result
})
})
}
}
interface PacketData {
ts: number
name: string
data: object
}
function TelemRealtimeProvider() {
return function (openmct: openmct.OpenMCT) {
const simpleIndicator = openmct.indicators.simpleIndicator();
openmct.indicators.add(simpleIndicator);
simpleIndicator.text("0 Listeners")
const url = `${process.env.BASE_URL.replace(/^http/, 'ws')}/api/v1/packets/subscribe?`
// we put our websocket connection here.
let connection = new WebSocket(url)
// connections contains name: callback mapping
const callbacks = {}
const conversions: Map<string, number> = new Map()
// names contains a set of *packet names*
const names = new Set()
function handleMessage(event) {
const data: PacketData = JSON.parse(event.data)
for (const [key, value] of Object.entries(data.data)) { // for each of the fields in the data
const id = `${data.name}.${key}` // if we have a matching callback for that field.
if (id in callbacks) {
// we should construct a telem point and make a callback.
// compute if we need to scale the value.
callbacks[id]({
"ts": data.ts,
"val": value * conversions.get(id)
})
}
}
}
function updateWebsocket() {
const params = new URLSearchParams()
for (const name in names) {
params.append("name", name)
}
connection = new WebSocket(url + params)
connection.onmessage = handleMessage
simpleIndicator.text(`${names.size} Listeners`)
}
const provider = {
supportsSubscribe: function (dObj) {
return dObj.type === "umnsvp-datum"
},
subscribe: function (dObj, callback) {
// identifier is packetname.fieldname. we add the packet name to the set.
const key = dObj.identifier.key
const pktName = key.split('.')[0]
// add our callback to the dictionary,
// add the packet name to the set
callbacks[key] = callback
conversions.set(key, dObj.conversion || 1)
names.add(pktName)
// update the websocket URL with the new name.
updateWebsocket()
return function unsubscribe() {
// if there's no more listeners on this packet,
// we can remove it.
console.log("subscribe called %s", JSON.stringify(dObj))
if (!Object.keys(callbacks).some((k) => k.startsWith(pktName))) {
names.delete(pktName)
updateWebsocket()
}
delete callbacks[key]
conversions.delete(key)
}
}
}
openmct.telemetry.addProvider(provider)
}
}
function GotelemPlugin() {
return function install(openmct) {
openmct.types.addType('umnsvp-datum', {
name: "UMN SVP Data Field",
description: "A data field of a packet from the car",
creatable: false,
cssClass: "icon-telemetry"
})
openmct.objects.addRoot({
namespace: "umnsvp",
key: 'car'
}, openmct.priority.HIGH)
openmct.objects.addProvider('umnsvp', objectProvider);
openmct.telemetry.addProvider(TelemHistoryProvider)
openmct.install(TelemRealtimeProvider())
}
}
openmct.install(GotelemPlugin())
//@ts-expect-error openmct
openmct.start()

10
web/src/index.html Normal file
View file

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html>
<head>
<title>Open MCT Tutorials</title>
<script src="openmct/openmct.js"></script>
</head>
<body>
<div id="app"></div>
</body>
</html>

19
web/tsconfig.json Normal file
View file

@ -0,0 +1,19 @@
{
"compilerOptions": {
// "baseUrl": "./src",
"target": "es6",
"checkJs": true,
"allowJs": true,
// "moduleResolution": "NodeNext",
"allowSyntheticDefaultImports": true,
"paths": {
"openmct": ["./node_modules/openmct/dist/openmct.d.ts"]
},
"esModuleInterop": true,
},
"exclude": [
"./dist/**/*",
"webpack.*.js",
"eslint.config.js"
]
}

40
web/webpack.common.js Normal file
View file

@ -0,0 +1,40 @@
import path from 'path';
import {fileURLToPath} from 'url';
import HtmlWebpackPlugin from 'html-webpack-plugin';
import CopyPlugin from 'copy-webpack-plugin';
const config = {
entry: './src/app.ts',
module: {
rules: [
{
test: /\.tsx?$/,
use: 'ts-loader',
exclude: /node_modules/,
},
],
},
plugins: [
new HtmlWebpackPlugin({
template: 'src/index.html',
filename: 'index.html',
}),
new CopyPlugin({
patterns: [
{ from: "**/*", to: "openmct/", context: "node_modules/openmct/dist"},
]
})
],
resolve: {
extensions: ['.tsx', '.ts', '.js'],
},
externals: {
openmct: "openmct",
},
output: {
filename: 'main.js',
path: path.resolve(path.dirname(fileURLToPath(import.meta.url)), 'dist'),
},
};
export default config

24
web/webpack.dev.js Normal file
View file

@ -0,0 +1,24 @@
import {merge} from 'webpack-merge';
import common from "./webpack.common.js"
import webpack from 'webpack'
const config = merge(common, {
mode: "development",
devtool: 'inline-source-map',
plugins: [
new webpack.EnvironmentPlugin({
NODE_ENV: "development",
BASE_URL: "http://localhost:8080"
}),
],
devServer: {
static: "./dist",
headers: {
"Access-Control-Allow-Origin": "*",
'Access-Control-Allow-Headers': '*',
'Access-Control-Allow-Methods': '*',
},
},
})
export default config

16
web/webpack.prod.js Normal file
View file

@ -0,0 +1,16 @@
import { merge } from 'webpack-merge'
import common from './webpack.common.js'
import webpack from 'webpack'
const config = merge(common, {
mode: "production",
plugins: [
new webpack.EnvironmentPlugin({
NODE_ENV: "production",
BASE_URL: "",
})
],
devtool: 'source-map',
})
export default config

View file

@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"io"
)
// Frameable is an object that can be sent in an XBee Frame. An XBee Frame
@ -104,7 +103,6 @@ func xbeeFrameSplit(data []byte, atEOF bool) (advance int, token []byte, err err
}
// FIXME: add bounds checking! this can panic.
var frameLen = int(binary.BigEndian.Uint16(data[startIdx+1:startIdx+3])) + 4
// if the value of frameLen is > 0x100, we know that it's screwed up.
// this helps keep error duration lowered.

View file

@ -103,14 +103,13 @@ func Test_xbeeFrameSplit(t *testing.T) {
{
name: "start delimiter inside partial packet",
args: args{
data: advTest,
data: advTest,
atEOF: false,
},
wantAdvance: 2,
wantToken: nil,
wantErr: false,
wantToken: nil,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -129,8 +128,6 @@ func Test_xbeeFrameSplit(t *testing.T) {
}
}
func Test_parseFrame(t *testing.T) {
type args struct {
frame []byte