From 598804e95c385c35bb8f8ac9c9d3c8e993f15b56 Mon Sep 17 00:00:00 2001 From: nilo Date: Sun, 23 Mar 2025 17:58:22 -0300 Subject: [PATCH] blocking channels --- controllers/webhookController.go | 110 ++++++++----------------------- globals/globals.go | 8 ++- main.go | 9 +++ models/models.go | 10 ++- services/channelservices.go | 53 ++++++++++++++- utils/utils.go | 21 ++++++ 6 files changed, 123 insertions(+), 88 deletions(-) diff --git a/controllers/webhookController.go b/controllers/webhookController.go index bfd54b1..9efc62e 100644 --- a/controllers/webhookController.go +++ b/controllers/webhookController.go @@ -5,29 +5,22 @@ import ( "api/models" "api/services" "api/utils" - "bytes" - "encoding/json" "fmt" "log" - "net/http" "strings" + "time" "github.com/gofiber/fiber/v2" ) -func KickSession(stream_name, session_id string) { - url := "http://localhost:8083/api/ctrl/kick_session" +type ResponseTest struct { + Data DataTest `json:"data"` +} - values := map[string]string{"stream_name": stream_name, "session_id": session_id} - - jsonValue, err := json.Marshal(values) - if err != nil { - log.Printf("Error Marshall KickSession: %s\n", err) - } - _, err = http.Post(url, "application/json", bytes.NewBuffer(jsonValue)) - if err != nil { - log.Printf("Error Post KickSession: %s\n", err) - } +type DataTest struct { + CNPJ string `json:"cnpj"` + Email string `json:"email"` + Name string `json:"name"` } func ServerStart(c *fiber.Ctx) error { @@ -47,19 +40,6 @@ func ServerStart(c *fiber.Ctx) error { return c.SendString("Server started: " + string(c.Body())) } -// func OnRtmpConnect(c *fiber.Ctx) error { - -// body := new(models.Connect) - -// if err := c.BodyParser(body); err != nil { -// log.Printf("Error Start: %s\n", err) -// return err -// } - -// utils.PrettyPrintJson(body) -// return c.SendString("On_Rtmp_Connect: " + string(c.Body())) -// } - // func OnUpdate(c *fiber.Ctx) error { // p := new(models.Update) // if err := c.BodyParser(p); err != nil { @@ -79,25 +59,6 @@ func ServerStart(c *fiber.Ctx) error { // return c.SendString("On_Update: " + string(c.Body())) // } -// func OnSubStart(c *fiber.Ctx) error { -// p := new(models.Update) -// if err := c.BodyParser(p); err != nil { -// log.Printf("Error SubStart: %s\n", err) -// return err -// } - -// log.Printf("SubStart") -// utils.PrettyPrintJson(p) - -// // if len(p.Groups) > 0 { -// // for _, g := range p.Groups { -// // log.Printf("Update %s %s [(%dx%d) %d]\n", g.Channel, g.UpdPub.StartTime, g.VideoWidth, g.VideoHeight, g.UpdPub.ReadBytesSum) -// // } -// // } - -// return c.SendString("On_Substart: " + string(c.Body())) -// } - func OnPubStart(c *fiber.Ctx) error { // ================================================================ // Called when a publisher starts streaming - Start of Transmission @@ -114,35 +75,21 @@ func OnPubStart(c *fiber.Ctx) error { log.Printf("======================== Start StreamName %s, Transmission Key %s, PlayerKey %s, SessionId %s\n", p.StreamName, transmissionkey, playerkey, p.SessionId) // Get the channel from the database - channel, currentTransmission := services.GetChannelByName(p.StreamName) + ok, currentTransmission := services.VerifyTransmissionAuthorization(p.StreamName, p.SessionId, transmissionkey) log.Printf("%v\n", currentTransmission) - // If the channel is not found, kick the session - if channel.ID == 0 { - services.AddTransmissionlog(p.StreamName, "Canal inexistente") - KickSession(p.StreamName, p.SessionId) - return fiber.ErrForbidden - } - - // If the channel is not active, kick the session - if channel.Status != "A" { - services.AddTransmissionlog(p.StreamName, "Canal não está ativo") - KickSession(p.StreamName, p.SessionId) - return fiber.ErrForbidden - } - - // If the transmission key does not match, kick the session - if transmissionkey != channel.TransmissionKey { - services.AddTransmissionlog(p.StreamName, "Transmissão com chave de transmissão errada") - KickSession(p.StreamName, p.SessionId) - return fiber.ErrForbidden - } - // Check if the channel has authorized time to transmit - services.AddTransmissionlog(p.StreamName, "Transmissão iniciada") - return c.SendString("On_Pub_Start: " + string(c.Body())) + if ok { + // If the channel is authorized, add it to the global transmissions list + globals.Transmissions[p.StreamName] = currentTransmission + + services.AddTransmissionlog(p.StreamName, "Transmissão iniciada") + return c.SendString("On_Pub_Start: " + string(c.Body())) + } + + return fiber.ErrForbidden } func OnPubStop(c *fiber.Ctx) error { @@ -154,20 +101,19 @@ func OnPubStop(c *fiber.Ctx) error { return err } - services.AddTransmissionlog(p.StreamName, "Transmissão encerrada") + transm := globals.Transmissions[p.StreamName] + + now := time.Now() + duration := now.Sub(transm.StartTime) + + // TODO: Updates the transmission on database for future calculation of remaining daily time + + msg := fmt.Sprintf("Transmissão encerrada: Duration %s\n", duration) + + services.AddTransmissionlog(p.StreamName, msg) return c.SendString("On_Pub_Stop: " + string(c.Body())) } -type ResponseTest struct { - Data DataTest `json:"data"` -} - -type DataTest struct { - CNPJ string `json:"cnpj"` - Email string `json:"email"` - Name string `json:"name"` -} - func WixTest(c *fiber.Ctx) error { // Get the data from the callback diff --git a/globals/globals.go b/globals/globals.go index 1fd9ec5..29687ef 100644 --- a/globals/globals.go +++ b/globals/globals.go @@ -1,10 +1,16 @@ package globals -import "gorm.io/gorm" +import ( + "api/models" + + "gorm.io/gorm" +) var ( API_VERSION = "" API_RELEASE = "" DB *gorm.DB + + Transmissions map[string]models.CurrentTransmission ) diff --git a/main.go b/main.go index 456dcf8..5d1553d 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "api/database" "api/globals" "api/routes" + "api/services" "io" "log" "os" @@ -86,6 +87,14 @@ func main() { // Setup routes routes.Setup(app) + // Starts transmissions processing + ticker := time.NewTicker(1 * time.Minute) + go func() { + for range ticker.C { + services.VerifyTransmissionsLimits() + } + }() + port := strconv.Itoa(config.Configurations.Data.API_PORT) log.Println("Server started in port " + port) diff --git a/models/models.go b/models/models.go index 4bac2bd..cbe7b3a 100644 --- a/models/models.go +++ b/models/models.go @@ -229,8 +229,12 @@ func (TransmissionLog) TableName() string { return "transmissionlog" } +// Available only on RAM type CurrentTransmission struct { - Channel string - StartTime time.Time - Limit time.Time + Channel string + StartTime time.Time + Limit time.Time + SessionID string + PlanDailyLimit int + Watchers int } diff --git a/services/channelservices.go b/services/channelservices.go index 941caeb..aaddbd1 100644 --- a/services/channelservices.go +++ b/services/channelservices.go @@ -3,10 +3,26 @@ package services import ( "api/globals" "api/models" + "api/utils" "time" ) -func GetChannelByName(channelname string) (models.Channel, models.CurrentTransmission) { +func VerifyTransmissionsLimits() { + for channelname, currentTransmission := range globals.Transmissions { + // If the channel has no daily transmission limit, skip the verification (Channels with 24 hours daily limit) + if time.Duration(currentTransmission.PlanDailyLimit) == 1440 { + continue + } + + // TODO: Implementar verificação de limite de transmissão diária (ou seja, quanto já foi transmitido HOJE) + if time.Now().After(currentTransmission.Limit) { + AddTransmissionlog(channelname, "Limite de transmissão diária atingido") + utils.KickSession(channelname, currentTransmission.SessionID) + } + } +} + +func VerifyTransmissionAuthorization(channelname, sessionid, transmissionkey string) (bool, models.CurrentTransmission) { var ( channel models.Channel plan models.Plan @@ -18,8 +34,41 @@ func GetChannelByName(channelname string) (models.Channel, models.CurrentTransmi globals.DB.First(&plan, channel.PlanID) currentTransmission.Channel = channel.Name + currentTransmission.SessionID = sessionid + currentTransmission.PlanDailyLimit = plan.DailyLimitTransmission currentTransmission.StartTime = time.Now() + currentTransmission.Limit = currentTransmission.StartTime.Add(time.Duration(plan.DailyLimitTransmission) * time.Minute) - return channel, currentTransmission + // If the channel is not found, kick the session + if channel.ID == 0 { + AddTransmissionlog(channelname, "Canal inexistente") + utils.KickSession(channelname, sessionid) + return false, currentTransmission + } + + // If the channel expiration date is reached, kick the session + if channel.DateLimit.After(time.Now()) { + AddTransmissionlog(channelname, "Canal com data de expiração vencida") + utils.KickSession(channelname, sessionid) + return false, currentTransmission + } + + // If the channel is not active, kick the session + if channel.Status != "A" { + AddTransmissionlog(channelname, "Canal não está ativo") + utils.KickSession(channelname, sessionid) + return false, currentTransmission + } + + // If the transmission key does not match, kick the session + if transmissionkey != channel.TransmissionKey { + AddTransmissionlog(channelname, "Transmissão com chave de transmissão errada") + utils.KickSession(channelname, sessionid) + return false, currentTransmission + } + + // TODO: Save the transmission on database for future calculation of remaining daily time + + return true, currentTransmission } diff --git a/utils/utils.go b/utils/utils.go index 6cf82b3..e1fe1c9 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -3,6 +3,7 @@ package utils import ( "api/globals" "api/models" + "bytes" "crypto/rand" "encoding/json" "errors" @@ -257,6 +258,10 @@ func SendTestEmailApproval(email, name, transmkey, url string) { } func ParseTransmissionString(input string) (transmissionKey, password string) { + // Default values + transmissionKey = "" + password = "" + // Check if the string contains "&" parts := strings.SplitN(input, "&", 2) @@ -270,3 +275,19 @@ func ParseTransmissionString(input string) (transmissionKey, password string) { return transmissionKey, password } + +func KickSession(stream_name, session_id string) { + url := "http://localhost:8083/api/ctrl/kick_session" + + values := map[string]string{"stream_name": stream_name, "session_id": session_id} + + jsonValue, err := json.Marshal(values) + if err != nil { + log.Printf("Error Marshall KickSession: %s\n", err) + } + + _, err = http.Post(url, "application/json", bytes.NewBuffer(jsonValue)) + if err != nil { + log.Printf("Error Post KickSession: %s\n", err) + } +}