fix: refactor signaler; fix logic for local watcher

This commit is contained in:
Charles Hathaway
2023-09-21 21:50:13 -07:00
parent 9bbe917e59
commit 7fbd4fff69
8 changed files with 920 additions and 1076 deletions
+148 -106
View File
@@ -5,7 +5,7 @@ import (
"context"
"encoding/base64"
"fmt"
"net/http"
"strings"
"sync"
"time"
@@ -22,29 +22,39 @@ type camera struct {
id string
}
type session struct {
id string
cameraID string
createTime time.Time
toCamera chan *pb.IceMessage
toClient chan *pb.IceMessage
}
type Server struct {
mu sync.Mutex
camerasByHome map[string]map[string]*camera
sessionsByCamera map[string]map[string]*pb.Session
sessionsById map[string]*pb.Session
waitersBySessionId map[string][]chan<- bool
mu sync.Mutex
camerasByHome map[string]map[string]*camera
sessionsByCamera map[string]chan *session
sessionsById map[string]*session
}
func New() *Server {
return &Server{
camerasByHome: make(map[string]map[string]*camera),
sessionsByCamera: make(map[string]map[string]*pb.Session),
sessionsById: make(map[string]*pb.Session),
waitersBySessionId: make(map[string][]chan<- bool),
s := &Server{
camerasByHome: make(map[string]map[string]*camera),
sessionsByCamera: make(map[string]chan *session),
sessionsById: make(map[string]*session),
}
go s.cleanup()
return s
}
func (s *Server) CreateAuthToken(ctx context.Context, request *connect.Request[pb.CreateAuthTokenRequest]) (*connect.Response[pb.AuthToken], error) {
req := request.Msg
var id string
switch req.Type.(type) {
case *pb.CreateAuthTokenRequest_Camera_:
id := req.GetCamera().GetId()
id = req.GetCamera().GetId()
s.mu.Lock()
thisCamera := &camera{
id: id,
@@ -54,15 +64,18 @@ func (s *Server) CreateAuthToken(ctx context.Context, request *connect.Request[p
s.camerasByHome[home] = make(map[string]*camera)
}
s.camerasByHome[home][id] = thisCamera
if _, ok := s.sessionsByCamera[id]; !ok {
s.sessionsByCamera[id] = make(chan *session, 100)
}
s.mu.Unlock()
case *pb.CreateAuthTokenRequest_Client_:
myUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("error creating UUID: %v", err)
}
id = myUUID.String()
}
myUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("error creating UUID: %v", err)
}
id := myUUID.String()
token := &internalpb.AuthToken{
Uid: id,
Home: req.GetHome(),
@@ -100,129 +113,158 @@ func (s *Server) ListCameras(ctx context.Context, request *connect.Request[pb.Li
// Optionally, wait_for_update can be set to prevent returning until the Camera has seen the
// session request, populated candidates, and returned a session offer.
func (s *Server) CreateSession(ctx context.Context, request *connect.Request[pb.CreateSessionRequest]) (*connect.Response[pb.Session], error) {
req := request.Msg
if req.GetSession() == nil {
thisSession := request.Msg.Session
if thisSession == nil {
return nil, status.Errorf(codes.InvalidArgument, "nil session")
}
myUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("error creating UUID: %v", err)
}
id := myUUID.String()
req.GetSession().Id = &pb.Session_Identifier{Id: id}
s.upsertSession(req.GetSession(), req.GetWaitForUpdate())
thisSession.Id = &pb.Session_Identifier{Id: id}
cameraID := thisSession.GetCamera().GetId()
s.mu.Lock()
defer s.mu.Unlock()
// Spin off goroutine to eventually cleanup the session
go s.scheduleCleanup(id, time.Minute)
sess := &session{
id: id,
cameraID: cameraID,
createTime: time.Now(),
toCamera: make(chan *pb.IceMessage, 100),
toClient: make(chan *pb.IceMessage, 100),
}
s.sessionsByCamera[cameraID] <- sess
s.sessionsById[id] = sess
returnSession := s.sessionsById[id]
return connect.NewResponse(returnSession), nil
return connect.NewResponse(thisSession), nil
}
// UpdateSession updates the session
func (s *Server) UpdateSession(ctx context.Context, request *connect.Request[pb.UpdateSessionRequest]) (*connect.Response[pb.Session], error) {
req := request.Msg
id := req.GetSession().GetId().GetId()
s.mu.Lock()
if _, ok := s.sessionsById[id]; !ok {
s.mu.Unlock()
return nil, status.Errorf(codes.NotFound, "no such session %q", id)
func (s *Server) PopSession(ctx context.Context, request *connect.Request[pb.PopSessionRequest]) (*connect.Response[pb.Session], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
s.mu.Lock()
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
if _, ok := s.camerasByHome[authToken.Home][authToken.Uid]; !ok {
return nil, status.Errorf(codes.Unauthenticated, "you are not a camera")
}
ch := s.sessionsByCamera[authToken.Uid]
s.mu.Unlock()
s.upsertSession(req.GetSession(), req.WaitForUpdate)
sess := <-ch
return connect.NewResponse(&pb.Session{
Id: &pb.Session_Identifier{
Id: sess.id,
},
}), nil
}
func (s *Server) CreateIceMessage(ctx context.Context, request *connect.Request[pb.CreateIceMessageRequest]) (*connect.Response[pb.IceMessage], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
req := request.Msg
s.mu.Lock()
defer s.mu.Unlock()
returnSession := s.sessionsById[id]
return connect.NewResponse(returnSession), nil
}
// ListSessions lists all sessions the client should consider.
//
// TODO: it would be better if we could alert a camera to poll for sessions
// i.e., with websockets (or streaming RPCs).
func (s *Server) ListSessions(context.Context, *connect.Request[pb.ListSessionsRequest]) (*connect.Response[pb.ListSessionsResponse], error) {
var sessions []*pb.Session
for _, session := range s.sessionsById {
sessions = append(sessions, session)
session, ok := s.sessionsById[req.GetSessionIdentifier().GetId()]
if !ok {
return nil, status.Errorf(codes.NotFound, "unknown session")
}
return connect.NewResponse(&pb.ListSessionsResponse{Sessions: sessions}), nil
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
_, isCamera := s.camerasByHome[authToken.Home][authToken.Uid]
session.createTime = time.Now()
msg := req.GetIceMessage()
if isCamera {
session.toClient <- msg
} else {
session.toCamera <- msg
}
return connect.NewResponse(msg), nil
}
func (s *Server) CreateIceCandidate(context.Context, *connect.Request[pb.CreateIceCandidateRequest]) (*connect.Response[pb.IceCandidate], error) {
return nil, fmt.Errorf("")
}
func (s *Server) PopIceMessage(ctx context.Context, request *connect.Request[pb.PopIceMessageRequest]) (*connect.Response[pb.IceMessage], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
req := request.Msg
func (s *Server) PopIceCandidate(context.Context, *connect.Request[pb.PopIceCandidateRequest]) (*connect.Response[pb.IceCandidate], error) {
return nil, fmt.Errorf("")
}
func (s *Server) CreateIceSessionDescription(context.Context, *connect.Request[pb.CreateIceSessionDescriptionRequest]) (*connect.Response[pb.IceSessionDescription], error) {
return nil, fmt.Errorf("")
}
func (s *Server) PopIceSessionDescription(context.Context, *connect.Request[pb.PopIceSessionDescriptionRequest]) (*connect.Response[pb.IceSessionDescription], error) {
return nil, fmt.Errorf("")
}
// upsertSession updates or creates a session, optionally blocking until the session is updated
// again in the future.
//
// WARN: this function locks and unlocks s.mu; make sure no locks are held or it will block forever.
func (s *Server) upsertSession(session *pb.Session, waitForUpdate bool) {
s.mu.Lock()
defer s.mu.Unlock()
id := session.Id.Id
s.sessionsById[id] = session
cameraID := session.GetCamera().GetId()
if _, ok := s.sessionsByCamera[cameraID]; !ok {
s.sessionsByCamera[cameraID] = make(map[string]*pb.Session)
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
s.sessionsByCamera[cameraID][id] = session
_, isCamera := s.camerasByHome[authToken.Home][authToken.Uid]
session := s.sessionsById[req.GetSessionIdentifier().GetId()]
session.createTime = time.Now()
s.mu.Unlock()
// Alert anything that needs to go
for _, waiter := range s.waitersBySessionId[id] {
waiter <- true
close(waiter)
var msg *pb.IceMessage
if isCamera {
msg = <-session.toCamera
} else {
msg = <-session.toClient
}
s.waitersBySessionId[id] = nil
return connect.NewResponse(msg), nil
}
if waitForUpdate {
waitChan := make(chan bool)
s.waitersBySessionId[id] = append(s.waitersBySessionId[id], waitChan)
defer func() {
<-waitChan
func (s *Server) cleanup() {
ticker := time.NewTicker(time.Minute * 5)
for t := range ticker.C {
func() {
s.mu.Lock()
defer s.mu.Unlock()
// Look for any stale sessions
staleSessionsByCamera := make(map[string]*session)
for _, session := range s.sessionsById {
if t.Sub(session.createTime) > time.Minute {
if prev, ok := staleSessionsByCamera[session.cameraID]; ok {
// Only use this session if it was created after the previous one
if prev.createTime.Before(session.createTime) {
staleSessionsByCamera[session.cameraID] = session
}
} else {
staleSessionsByCamera[session.cameraID] = session
}
}
}
for camera, lastSession := range staleSessionsByCamera {
// Consume from the chanel until we remove all stale sessions
for sess := range s.sessionsByCamera[camera] {
if sess == lastSession {
// We consumed all stale channels; break
break
}
}
}
}()
}
}
func (s *Server) scheduleCleanup(sessionID string, waitPeriod time.Duration) {
time.Sleep(waitPeriod)
s.mu.Lock()
defer s.mu.Unlock()
session := s.sessionsById[sessionID]
for _, water := range s.waitersBySessionId[sessionID] {
water <- true
close(water)
}
delete(s.sessionsByCamera[session.GetCamera().GetId()], sessionID)
delete(s.sessionsById, sessionID)
delete(s.waitersBySessionId, sessionID)
}
func getAuthToken[T any](req *connect.Request[T]) (*internalpb.AuthToken, error) {
authHeader := req.Header().Get(http.CanonicalHeaderKey("Authorization"))
authHeader := req.Header().Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
return nil, fmt.Errorf("invalid authorization token")
}
authHeader = authHeader[len("Bearer "):]
bytes, err := base64.URLEncoding.DecodeString(authHeader)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (extract)")