blocking channels

main
nilo 2025-03-23 17:58:22 -03:00
parent 9b2936c810
commit 598804e95c
6 changed files with 123 additions and 88 deletions

View File

@ -5,29 +5,22 @@ import (
"api/models"
"api/services"
"api/utils"
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/gofiber/fiber/v2"
)
func KickSession(stream_name, session_id string) {
url := "http://localhost:8083/api/ctrl/kick_session"
type ResponseTest struct {
Data DataTest `json:"data"`
}
values := map[string]string{"stream_name": stream_name, "session_id": session_id}
jsonValue, err := json.Marshal(values)
if err != nil {
log.Printf("Error Marshall KickSession: %s\n", err)
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
log.Printf("Error Post KickSession: %s\n", err)
}
type DataTest struct {
CNPJ string `json:"cnpj"`
Email string `json:"email"`
Name string `json:"name"`
}
func ServerStart(c *fiber.Ctx) error {
@ -47,19 +40,6 @@ func ServerStart(c *fiber.Ctx) error {
return c.SendString("Server started: " + string(c.Body()))
}
// func OnRtmpConnect(c *fiber.Ctx) error {
// body := new(models.Connect)
// if err := c.BodyParser(body); err != nil {
// log.Printf("Error Start: %s\n", err)
// return err
// }
// utils.PrettyPrintJson(body)
// return c.SendString("On_Rtmp_Connect: " + string(c.Body()))
// }
// func OnUpdate(c *fiber.Ctx) error {
// p := new(models.Update)
// if err := c.BodyParser(p); err != nil {
@ -79,25 +59,6 @@ func ServerStart(c *fiber.Ctx) error {
// return c.SendString("On_Update: " + string(c.Body()))
// }
// func OnSubStart(c *fiber.Ctx) error {
// p := new(models.Update)
// if err := c.BodyParser(p); err != nil {
// log.Printf("Error SubStart: %s\n", err)
// return err
// }
// log.Printf("SubStart")
// utils.PrettyPrintJson(p)
// // if len(p.Groups) > 0 {
// // for _, g := range p.Groups {
// // log.Printf("Update %s %s [(%dx%d) %d]\n", g.Channel, g.UpdPub.StartTime, g.VideoWidth, g.VideoHeight, g.UpdPub.ReadBytesSum)
// // }
// // }
// return c.SendString("On_Substart: " + string(c.Body()))
// }
func OnPubStart(c *fiber.Ctx) error {
// ================================================================
// Called when a publisher starts streaming - Start of Transmission
@ -114,35 +75,21 @@ func OnPubStart(c *fiber.Ctx) error {
log.Printf("======================== Start StreamName %s, Transmission Key %s, PlayerKey %s, SessionId %s\n", p.StreamName, transmissionkey, playerkey, p.SessionId)
// Get the channel from the database
channel, currentTransmission := services.GetChannelByName(p.StreamName)
ok, currentTransmission := services.VerifyTransmissionAuthorization(p.StreamName, p.SessionId, transmissionkey)
log.Printf("%v\n", currentTransmission)
// If the channel is not found, kick the session
if channel.ID == 0 {
services.AddTransmissionlog(p.StreamName, "Canal inexistente")
KickSession(p.StreamName, p.SessionId)
return fiber.ErrForbidden
}
// If the channel is not active, kick the session
if channel.Status != "A" {
services.AddTransmissionlog(p.StreamName, "Canal não está ativo")
KickSession(p.StreamName, p.SessionId)
return fiber.ErrForbidden
}
// If the transmission key does not match, kick the session
if transmissionkey != channel.TransmissionKey {
services.AddTransmissionlog(p.StreamName, "Transmissão com chave de transmissão errada")
KickSession(p.StreamName, p.SessionId)
return fiber.ErrForbidden
}
// Check if the channel has authorized time to transmit
services.AddTransmissionlog(p.StreamName, "Transmissão iniciada")
return c.SendString("On_Pub_Start: " + string(c.Body()))
if ok {
// If the channel is authorized, add it to the global transmissions list
globals.Transmissions[p.StreamName] = currentTransmission
services.AddTransmissionlog(p.StreamName, "Transmissão iniciada")
return c.SendString("On_Pub_Start: " + string(c.Body()))
}
return fiber.ErrForbidden
}
func OnPubStop(c *fiber.Ctx) error {
@ -154,20 +101,19 @@ func OnPubStop(c *fiber.Ctx) error {
return err
}
services.AddTransmissionlog(p.StreamName, "Transmissão encerrada")
transm := globals.Transmissions[p.StreamName]
now := time.Now()
duration := now.Sub(transm.StartTime)
// TODO: Updates the transmission on database for future calculation of remaining daily time
msg := fmt.Sprintf("Transmissão encerrada: Duration %s\n", duration)
services.AddTransmissionlog(p.StreamName, msg)
return c.SendString("On_Pub_Stop: " + string(c.Body()))
}
type ResponseTest struct {
Data DataTest `json:"data"`
}
type DataTest struct {
CNPJ string `json:"cnpj"`
Email string `json:"email"`
Name string `json:"name"`
}
func WixTest(c *fiber.Ctx) error {
// Get the data from the callback

View File

@ -1,10 +1,16 @@
package globals
import "gorm.io/gorm"
import (
"api/models"
"gorm.io/gorm"
)
var (
API_VERSION = ""
API_RELEASE = ""
DB *gorm.DB
Transmissions map[string]models.CurrentTransmission
)

View File

@ -5,6 +5,7 @@ import (
"api/database"
"api/globals"
"api/routes"
"api/services"
"io"
"log"
"os"
@ -86,6 +87,14 @@ func main() {
// Setup routes
routes.Setup(app)
// Starts transmissions processing
ticker := time.NewTicker(1 * time.Minute)
go func() {
for range ticker.C {
services.VerifyTransmissionsLimits()
}
}()
port := strconv.Itoa(config.Configurations.Data.API_PORT)
log.Println("Server started in port " + port)

View File

@ -229,8 +229,12 @@ func (TransmissionLog) TableName() string {
return "transmissionlog"
}
// Available only on RAM
type CurrentTransmission struct {
Channel string
StartTime time.Time
Limit time.Time
Channel string
StartTime time.Time
Limit time.Time
SessionID string
PlanDailyLimit int
Watchers int
}

View File

@ -3,10 +3,26 @@ package services
import (
"api/globals"
"api/models"
"api/utils"
"time"
)
func GetChannelByName(channelname string) (models.Channel, models.CurrentTransmission) {
func VerifyTransmissionsLimits() {
for channelname, currentTransmission := range globals.Transmissions {
// If the channel has no daily transmission limit, skip the verification (Channels with 24 hours daily limit)
if time.Duration(currentTransmission.PlanDailyLimit) == 1440 {
continue
}
// TODO: Implementar verificação de limite de transmissão diária (ou seja, quanto já foi transmitido HOJE)
if time.Now().After(currentTransmission.Limit) {
AddTransmissionlog(channelname, "Limite de transmissão diária atingido")
utils.KickSession(channelname, currentTransmission.SessionID)
}
}
}
func VerifyTransmissionAuthorization(channelname, sessionid, transmissionkey string) (bool, models.CurrentTransmission) {
var (
channel models.Channel
plan models.Plan
@ -18,8 +34,41 @@ func GetChannelByName(channelname string) (models.Channel, models.CurrentTransmi
globals.DB.First(&plan, channel.PlanID)
currentTransmission.Channel = channel.Name
currentTransmission.SessionID = sessionid
currentTransmission.PlanDailyLimit = plan.DailyLimitTransmission
currentTransmission.StartTime = time.Now()
currentTransmission.Limit = currentTransmission.StartTime.Add(time.Duration(plan.DailyLimitTransmission) * time.Minute)
return channel, currentTransmission
// If the channel is not found, kick the session
if channel.ID == 0 {
AddTransmissionlog(channelname, "Canal inexistente")
utils.KickSession(channelname, sessionid)
return false, currentTransmission
}
// If the channel expiration date is reached, kick the session
if channel.DateLimit.After(time.Now()) {
AddTransmissionlog(channelname, "Canal com data de expiração vencida")
utils.KickSession(channelname, sessionid)
return false, currentTransmission
}
// If the channel is not active, kick the session
if channel.Status != "A" {
AddTransmissionlog(channelname, "Canal não está ativo")
utils.KickSession(channelname, sessionid)
return false, currentTransmission
}
// If the transmission key does not match, kick the session
if transmissionkey != channel.TransmissionKey {
AddTransmissionlog(channelname, "Transmissão com chave de transmissão errada")
utils.KickSession(channelname, sessionid)
return false, currentTransmission
}
// TODO: Save the transmission on database for future calculation of remaining daily time
return true, currentTransmission
}

View File

@ -3,6 +3,7 @@ package utils
import (
"api/globals"
"api/models"
"bytes"
"crypto/rand"
"encoding/json"
"errors"
@ -257,6 +258,10 @@ func SendTestEmailApproval(email, name, transmkey, url string) {
}
func ParseTransmissionString(input string) (transmissionKey, password string) {
// Default values
transmissionKey = ""
password = ""
// Check if the string contains "&"
parts := strings.SplitN(input, "&", 2)
@ -270,3 +275,19 @@ func ParseTransmissionString(input string) (transmissionKey, password string) {
return transmissionKey, password
}
func KickSession(stream_name, session_id string) {
url := "http://localhost:8083/api/ctrl/kick_session"
values := map[string]string{"stream_name": stream_name, "session_id": session_id}
jsonValue, err := json.Marshal(values)
if err != nil {
log.Printf("Error Marshall KickSession: %s\n", err)
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
log.Printf("Error Post KickSession: %s\n", err)
}
}