// Package signaler implements a signaler server. package signaler import ( "context" "encoding/base64" "fmt" "net/http" "sync" "time" "connectrpc.com/connect" pb "github.com/chathaway-codes/home-sensors/v2/gen" internalpb "github.com/chathaway-codes/home-sensors/v2/gen/token" "github.com/gofrs/uuid/v5" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) type camera struct { id string } type Server struct { mu sync.Mutex camerasByHome map[string]map[string]*camera sessionsByCamera map[string]map[string]*pb.Session sessionsById map[string]*pb.Session waitersBySessionId map[string][]chan<- bool } func New() *Server { return &Server{ camerasByHome: make(map[string]map[string]*camera), sessionsByCamera: make(map[string]map[string]*pb.Session), sessionsById: make(map[string]*pb.Session), waitersBySessionId: make(map[string][]chan<- bool), } } func (s *Server) CreateAuthToken(ctx context.Context, request *connect.Request[pb.CreateAuthTokenRequest]) (*connect.Response[pb.AuthToken], error) { req := request.Msg switch req.Type.(type) { case *pb.CreateAuthTokenRequest_Camera_: id := req.GetCamera().GetId() s.mu.Lock() thisCamera := &camera{ id: id, } home := req.GetHome() if _, ok := s.camerasByHome[home]; !ok { s.camerasByHome[home] = make(map[string]*camera) } s.camerasByHome[home][id] = thisCamera s.mu.Unlock() } myUUID, err := uuid.NewV4() if err != nil { return nil, fmt.Errorf("error creating UUID: %v", err) } id := myUUID.String() token := &internalpb.AuthToken{ Uid: id, Home: req.GetHome(), } bytes, err := proto.Marshal(token) if err != nil { return nil, status.Errorf(codes.Internal, "failed to generate token: %v", err) } outToken := base64.URLEncoding.EncodeToString(bytes) return connect.NewResponse(&pb.AuthToken{ Token: outToken, }), nil } func (s *Server) ListCameras(ctx context.Context, request *connect.Request[pb.ListCamerasRequest]) (*connect.Response[pb.ListCamerasResponse], error) { authToken, err := getAuthToken(request) if err != nil { return nil, err } s.mu.Lock() defer s.mu.Unlock() var cameras []*pb.Camera for _, camera := range s.camerasByHome[authToken.Home] { cameras = append(cameras, &pb.Camera{ Identifier: &pb.Camera_Identifier{ Id: camera.id, }, }) } return connect.NewResponse(&pb.ListCamerasResponse{Cameras: cameras}), nil } // CreateSession creates a new session that can be seen bv the provided Camera and Peer. // // Optionally, wait_for_update can be set to prevent returning until the Camera has seen the // session request, populated candidates, and returned a session offer. func (s *Server) CreateSession(ctx context.Context, request *connect.Request[pb.CreateSessionRequest]) (*connect.Response[pb.Session], error) { req := request.Msg if req.GetSession() == nil { return nil, status.Errorf(codes.InvalidArgument, "nil session") } myUUID, err := uuid.NewV4() if err != nil { return nil, fmt.Errorf("error creating UUID: %v", err) } id := myUUID.String() req.GetSession().Id = &pb.Session_Identifier{Id: id} s.upsertSession(req.GetSession(), req.GetWaitForUpdate()) s.mu.Lock() defer s.mu.Unlock() // Spin off goroutine to eventually cleanup the session go s.scheduleCleanup(id, time.Minute) returnSession := s.sessionsById[id] return connect.NewResponse(returnSession), nil } // UpdateSession updates the session func (s *Server) UpdateSession(ctx context.Context, request *connect.Request[pb.UpdateSessionRequest]) (*connect.Response[pb.Session], error) { req := request.Msg id := req.GetSession().GetId().GetId() s.mu.Lock() if _, ok := s.sessionsById[id]; !ok { s.mu.Unlock() return nil, status.Errorf(codes.NotFound, "no such session %q", id) } s.mu.Unlock() s.upsertSession(req.GetSession(), req.WaitForUpdate) s.mu.Lock() defer s.mu.Unlock() returnSession := s.sessionsById[id] return connect.NewResponse(returnSession), nil } // ListSessions lists all sessions the client should consider. // // TODO: it would be better if we could alert a camera to poll for sessions // i.e., with websockets (or streaming RPCs). func (s *Server) ListSessions(context.Context, *connect.Request[pb.ListSessionsRequest]) (*connect.Response[pb.ListSessionsResponse], error) { var sessions []*pb.Session for _, session := range s.sessionsById { sessions = append(sessions, session) } return connect.NewResponse(&pb.ListSessionsResponse{Sessions: sessions}), nil } func (s *Server) CreateIceCandidate(context.Context, *connect.Request[pb.CreateIceCandidateRequest]) (*connect.Response[pb.IceCandidate], error) { return nil, fmt.Errorf("") } func (s *Server) PopIceCandidate(context.Context, *connect.Request[pb.PopIceCandidateRequest]) (*connect.Response[pb.IceCandidate], error) { return nil, fmt.Errorf("") } func (s *Server) CreateIceSessionDescription(context.Context, *connect.Request[pb.CreateIceSessionDescriptionRequest]) (*connect.Response[pb.IceSessionDescription], error) { return nil, fmt.Errorf("") } func (s *Server) PopIceSessionDescription(context.Context, *connect.Request[pb.PopIceSessionDescriptionRequest]) (*connect.Response[pb.IceSessionDescription], error) { return nil, fmt.Errorf("") } // upsertSession updates or creates a session, optionally blocking until the session is updated // again in the future. // // WARN: this function locks and unlocks s.mu; make sure no locks are held or it will block forever. func (s *Server) upsertSession(session *pb.Session, waitForUpdate bool) { s.mu.Lock() defer s.mu.Unlock() id := session.Id.Id s.sessionsById[id] = session cameraID := session.GetCamera().GetId() if _, ok := s.sessionsByCamera[cameraID]; !ok { s.sessionsByCamera[cameraID] = make(map[string]*pb.Session) } s.sessionsByCamera[cameraID][id] = session // Alert anything that needs to go for _, waiter := range s.waitersBySessionId[id] { waiter <- true close(waiter) } s.waitersBySessionId[id] = nil if waitForUpdate { waitChan := make(chan bool) s.waitersBySessionId[id] = append(s.waitersBySessionId[id], waitChan) defer func() { <-waitChan }() } } func (s *Server) scheduleCleanup(sessionID string, waitPeriod time.Duration) { time.Sleep(waitPeriod) s.mu.Lock() defer s.mu.Unlock() session := s.sessionsById[sessionID] for _, water := range s.waitersBySessionId[sessionID] { water <- true close(water) } delete(s.sessionsByCamera[session.GetCamera().GetId()], sessionID) delete(s.sessionsById, sessionID) delete(s.waitersBySessionId, sessionID) } func getAuthToken[T any](req *connect.Request[T]) (*internalpb.AuthToken, error) { authHeader := req.Header().Get(http.CanonicalHeaderKey("Authorization")) bytes, err := base64.URLEncoding.DecodeString(authHeader) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (extract)") } authToken := &internalpb.AuthToken{} if err := proto.Unmarshal(bytes, authToken); err != nil { return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (parse)") } return authToken, nil }