352 lines
9.5 KiB
Go
352 lines
9.5 KiB
Go
// Package signaler implements a signaler server.
|
|
package signaler
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
pb "github.com/chathaway-codes/home-sensors/v2/gen"
|
|
internalpb "github.com/chathaway-codes/home-sensors/v2/gen/token"
|
|
"github.com/gofrs/uuid/v5"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
type camera struct {
|
|
id string
|
|
}
|
|
|
|
type session struct {
|
|
id string
|
|
cameraID string
|
|
createTime time.Time
|
|
toCamera chan *pb.IceMessage
|
|
toClient chan *pb.IceMessage
|
|
}
|
|
|
|
type Server struct {
|
|
mu sync.Mutex
|
|
camerasByHome map[string]map[string]*camera
|
|
|
|
sessionsByCamera map[string]chan *session
|
|
sessionsById map[string]*session
|
|
|
|
// Most recent sample
|
|
samplesByCamera map[string]map[pb.Sample_Type]*pb.Sample
|
|
}
|
|
|
|
func New() *Server {
|
|
s := &Server{
|
|
camerasByHome: make(map[string]map[string]*camera),
|
|
sessionsByCamera: make(map[string]chan *session),
|
|
sessionsById: make(map[string]*session),
|
|
samplesByCamera: make(map[string]map[pb.Sample_Type]*pb.Sample),
|
|
}
|
|
go s.cleanup()
|
|
return s
|
|
}
|
|
|
|
func (s *Server) CreateAuthToken(ctx context.Context, request *connect.Request[pb.CreateAuthTokenRequest]) (*connect.Response[pb.AuthToken], error) {
|
|
req := request.Msg
|
|
|
|
log.Printf("Creating auth token")
|
|
defer log.Printf("Done creating auth token")
|
|
var id string
|
|
switch req.Type.(type) {
|
|
case *pb.CreateAuthTokenRequest_Camera_:
|
|
id = req.GetCamera().GetId()
|
|
thisCamera := &camera{
|
|
id: id,
|
|
}
|
|
home := req.GetHome()
|
|
s.mu.Lock()
|
|
if _, ok := s.camerasByHome[home]; !ok {
|
|
s.camerasByHome[home] = make(map[string]*camera)
|
|
}
|
|
s.camerasByHome[home][id] = thisCamera
|
|
if _, ok := s.sessionsByCamera[id]; ok {
|
|
close(s.sessionsByCamera[id])
|
|
}
|
|
s.sessionsByCamera[id] = make(chan *session, 100)
|
|
s.mu.Unlock()
|
|
case *pb.CreateAuthTokenRequest_Client_:
|
|
myUUID, err := uuid.NewV4()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating UUID: %v", err)
|
|
}
|
|
id = myUUID.String()
|
|
}
|
|
|
|
token := &internalpb.AuthToken{
|
|
Uid: id,
|
|
Home: req.GetHome(),
|
|
}
|
|
bytes, err := proto.Marshal(token)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to generate token: %v", err)
|
|
}
|
|
outToken := base64.URLEncoding.EncodeToString(bytes)
|
|
return connect.NewResponse(&pb.AuthToken{
|
|
Token: outToken,
|
|
}), nil
|
|
}
|
|
|
|
func (s *Server) ListCameras(ctx context.Context, request *connect.Request[pb.ListCamerasRequest]) (*connect.Response[pb.ListCamerasResponse], error) {
|
|
authToken, err := getAuthToken(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
var cameras []*pb.Camera
|
|
for _, camera := range s.camerasByHome[authToken.Home] {
|
|
cameras = append(cameras, &pb.Camera{
|
|
Identifier: &pb.Camera_Identifier{
|
|
Id: camera.id,
|
|
},
|
|
})
|
|
}
|
|
return connect.NewResponse(&pb.ListCamerasResponse{Cameras: cameras}), nil
|
|
}
|
|
|
|
// CreateSession creates a new session that can be seen bv the provided Camera and Peer.
|
|
//
|
|
// Optionally, wait_for_update can be set to prevent returning until the Camera has seen the
|
|
// session request, populated candidates, and returned a session offer.
|
|
func (s *Server) CreateSession(ctx context.Context, request *connect.Request[pb.CreateSessionRequest]) (*connect.Response[pb.Session], error) {
|
|
|
|
log.Printf("Creating session")
|
|
defer log.Printf("Done session")
|
|
thisSession := request.Msg.Session
|
|
if thisSession == nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "nil session")
|
|
}
|
|
|
|
myUUID, err := uuid.NewV4()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating UUID: %v", err)
|
|
}
|
|
id := myUUID.String()
|
|
thisSession.Id = &pb.Session_Identifier{Id: id}
|
|
cameraID := thisSession.GetCamera().GetId()
|
|
|
|
sess := &session{
|
|
id: id,
|
|
cameraID: cameraID,
|
|
createTime: time.Now(),
|
|
toCamera: make(chan *pb.IceMessage, 100),
|
|
toClient: make(chan *pb.IceMessage, 100),
|
|
}
|
|
|
|
s.mu.Lock()
|
|
ch := s.sessionsByCamera[cameraID]
|
|
s.sessionsById[id] = sess
|
|
s.mu.Unlock()
|
|
|
|
ch <- sess
|
|
|
|
return connect.NewResponse(thisSession), nil
|
|
}
|
|
|
|
func (s *Server) PopSession(ctx context.Context, request *connect.Request[pb.PopSessionRequest]) (*connect.Response[pb.Session], error) {
|
|
authToken, err := getAuthToken(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.mu.Lock()
|
|
|
|
if _, ok := s.camerasByHome[authToken.Home]; !ok {
|
|
s.mu.Unlock()
|
|
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
|
|
}
|
|
if _, ok := s.camerasByHome[authToken.Home][authToken.Uid]; !ok {
|
|
s.mu.Unlock()
|
|
return nil, status.Errorf(codes.Unauthenticated, "you are not a camera")
|
|
}
|
|
|
|
ch := s.sessionsByCamera[authToken.Uid]
|
|
s.mu.Unlock()
|
|
|
|
sess := <-ch
|
|
|
|
if sess == nil {
|
|
return nil, status.Errorf(codes.DataLoss, "someone else stole the session")
|
|
}
|
|
|
|
return connect.NewResponse(&pb.Session{
|
|
Id: &pb.Session_Identifier{
|
|
Id: sess.id,
|
|
},
|
|
}), nil
|
|
}
|
|
|
|
func (s *Server) CreateIceMessage(ctx context.Context, request *connect.Request[pb.CreateIceMessageRequest]) (*connect.Response[pb.IceMessage], error) {
|
|
authToken, err := getAuthToken(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req := request.Msg
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
session, ok := s.sessionsById[req.GetSessionIdentifier().GetId()]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.NotFound, "unknown session")
|
|
}
|
|
|
|
if _, ok := s.camerasByHome[authToken.Home]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
|
|
}
|
|
_, isCamera := s.camerasByHome[authToken.Home][authToken.Uid]
|
|
|
|
session.createTime = time.Now()
|
|
msg := req.GetIceMessage()
|
|
if isCamera {
|
|
session.toClient <- msg
|
|
} else {
|
|
session.toCamera <- msg
|
|
}
|
|
return connect.NewResponse(msg), nil
|
|
}
|
|
|
|
func (s *Server) PopIceMessage(ctx context.Context, request *connect.Request[pb.PopIceMessageRequest]) (*connect.Response[pb.IceMessage], error) {
|
|
authToken, err := getAuthToken(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req := request.Msg
|
|
|
|
s.mu.Lock()
|
|
|
|
if _, ok := s.camerasByHome[authToken.Home]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
|
|
}
|
|
_, isCamera := s.camerasByHome[authToken.Home][authToken.Uid]
|
|
session := s.sessionsById[req.GetSessionIdentifier().GetId()]
|
|
session.createTime = time.Now()
|
|
s.mu.Unlock()
|
|
|
|
var msg *pb.IceMessage
|
|
if isCamera {
|
|
msg = <-session.toCamera
|
|
} else {
|
|
msg = <-session.toClient
|
|
}
|
|
return connect.NewResponse(msg), nil
|
|
}
|
|
|
|
func (s *Server) CreateSample(ctx context.Context, request *connect.Request[pb.CreateSampleRequest]) (*connect.Response[pb.Sample], error) {
|
|
authToken, err := getAuthToken(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, ok := s.camerasByHome[authToken.Home]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
|
|
}
|
|
cam, ok := s.camerasByHome[authToken.Home][authToken.Uid]
|
|
if !ok {
|
|
return nil, status.Errorf(codes.Unauthenticated, "you are not a camera")
|
|
}
|
|
|
|
if _, ok := s.samplesByCamera[authToken.Uid]; !ok {
|
|
s.samplesByCamera[authToken.Uid] = make(map[pb.Sample_Type]*pb.Sample)
|
|
}
|
|
|
|
sample := request.Msg.GetSample()
|
|
s.samplesByCamera[authToken.Uid][sample.Type] = &pb.Sample{
|
|
Type: sample.Type,
|
|
Reading: sample.Reading,
|
|
|
|
CameraId: &pb.Camera_Identifier{
|
|
Id: cam.id,
|
|
},
|
|
}
|
|
return connect.NewResponse(s.samplesByCamera[authToken.Uid][sample.Type]), nil
|
|
}
|
|
|
|
func (s *Server) ListSamples(ctx context.Context, request *connect.Request[pb.ListSamplesRequest]) (*connect.Response[pb.ListSamplesResponse], error) {
|
|
authToken, err := getAuthToken(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if _, ok := s.camerasByHome[authToken.Home]; !ok {
|
|
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
|
|
}
|
|
|
|
var samples []*pb.Sample
|
|
for camera := range s.camerasByHome[authToken.Home] {
|
|
if sample, ok := s.samplesByCamera[camera]; ok {
|
|
for _, sample := range sample {
|
|
samples = append(samples, sample)
|
|
}
|
|
}
|
|
}
|
|
return connect.NewResponse(&pb.ListSamplesResponse{
|
|
Samples: samples,
|
|
}), nil
|
|
}
|
|
|
|
func (s *Server) cleanup() {
|
|
ticker := time.NewTicker(time.Minute * 5)
|
|
for t := range ticker.C {
|
|
func() {
|
|
log.Printf("Starting cleanup")
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
log.Printf("Cleanup locked")
|
|
|
|
// Look for any stale sessions
|
|
staleSessionsByCamera := make(map[string]*session)
|
|
for _, session := range s.sessionsById {
|
|
if t.Sub(session.createTime) > time.Minute {
|
|
if prev, ok := staleSessionsByCamera[session.cameraID]; ok {
|
|
// Only use this session if it was created after the previous one
|
|
if prev.createTime.Before(session.createTime) {
|
|
staleSessionsByCamera[session.cameraID] = session
|
|
}
|
|
} else {
|
|
staleSessionsByCamera[session.cameraID] = session
|
|
}
|
|
}
|
|
}
|
|
log.Printf("Removing stale sessions")
|
|
|
|
// TODO: how do we prevent sessions from accumlating if cameras don't pick up on the request?
|
|
}()
|
|
}
|
|
}
|
|
|
|
func getAuthToken[T any](req *connect.Request[T]) (*internalpb.AuthToken, error) {
|
|
authHeader := req.Header().Get("Authorization")
|
|
if !strings.HasPrefix(authHeader, "Bearer ") {
|
|
return nil, fmt.Errorf("invalid authorization token")
|
|
}
|
|
authHeader = authHeader[len("Bearer "):]
|
|
bytes, err := base64.URLEncoding.DecodeString(authHeader)
|
|
if err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (extract)")
|
|
}
|
|
authToken := &internalpb.AuthToken{}
|
|
if err := proto.Unmarshal(bytes, authToken); err != nil {
|
|
return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (parse)")
|
|
}
|
|
return authToken, nil
|
|
}
|