Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env-template
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ KAFKA_URI=kafka_uri
AWS_ACCESS_KEY_ID=aws_access_key_id
AWS_SECRET_ACCESS_KEY=aws_secret_access_key
GCP_CREDENTIALS=gcp_credentials (the path of the json file)
AZURE_ACCESS_KEY=azure_access_key
AZURE_ACCESS_KEY=azure_access_key
LOG_DIR=""
LOG_LEVEL="info"
1 change: 1 addition & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ const GCP_BUCKET_NAME = "zstream-bucket"
const GCP_PROJECT_ID = ""
const AZURE_ACCOUNT_NAME = ""
const AZURE_ENDPOINT = ""
const TRANSACTION_ID_KEY = "TRANSACTION_ID"
9 changes: 6 additions & 3 deletions controllers/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ func PublishMessage(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

jsonBytes, err := json.Marshal(request)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
message, err := utils.PublishMessage(configs.EnvVar[configs.KAFKA_URI], string(jsonBytes), "video")

message, err := utils.PublishMessage(c, configs.EnvVar[configs.KAFKA_URI], string(jsonBytes), "video")
if err != nil {
c.JSON(http.StatusExpectationFailed, gin.H{"error": message})
} else {
c.JSON(http.StatusCreated, gin.H{"status": message})
return
}

c.JSON(http.StatusCreated, gin.H{"status": message})
}
4 changes: 4 additions & 0 deletions controllers/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package controllers

import (
"net/http"
"zestream-server/logger"

"github.com/gin-gonic/gin"
)

func Ping(c *gin.Context) {
logger.Info(c, "ping-pong received", logger.Z{
"input": "output",
})
c.String(http.StatusOK, "pong")
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/aws/aws-sdk-go v1.44.167
github.com/gin-gonic/gin v1.8.2
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.4.0
github.com/segmentio/kafka-go v0.4.38
github.com/stretchr/testify v1.8.1
github.com/u2takey/ffmpeg-go v0.4.1
go.uber.org/zap v1.24.0
)

require (
Expand All @@ -28,7 +30,6 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand All @@ -45,6 +46,8 @@ require (
github.com/u2takey/go-utils v0.3.1 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
Expand Down
19 changes: 17 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@ cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22m
cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
cloud.google.com/go/iam v0.7.0 h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs=
cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQEYOeg=
cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
cloud.google.com/go/storage v1.28.1 h1:F5QDG5ChchaAVQhINh24U99OWHURqrW8OmQcGKXcbgI=
cloud.google.com/go/storage v1.28.1/go.mod h1:Qnisd4CqDdo6BGs2AD5LLnEsmSQ80wQ5ogcBBKhU86Y=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aws/aws-sdk-go v1.38.20/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.44.167 h1:kQmBhGdZkQLU7AiHShSkBJ15zr8agy0QeaxXduvyp2E=
github.com/aws/aws-sdk-go v1.44.167/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand All @@ -35,6 +42,7 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
Expand Down Expand Up @@ -76,11 +84,11 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -132,6 +140,7 @@ github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -168,6 +177,13 @@ github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down Expand Up @@ -244,7 +260,6 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
Expand Down
163 changes: 163 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package logger

import (
"context"
"os"
"sync"
"zestream-server/constants"

"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Z = map[string]interface{}

var once sync.Once
var singleton *zap.SugaredLogger

/*
Init initializes a thread-safe singleton logger
This would be called from a main method when the application starts up
This function would ideally, take zap configuration, but is left out
in favor of simplicity using the example logger.
*/
func Init(name string, logLevel string) {
// once ensures the singleton is initialized only once
once.Do(func() {
if name == "" {
name = "zestream-server"
}

// by default, this sets the minimum logging level to info
cfg := zap.NewProductionConfig()
cfg.Level.SetLevel(parseLogLevel(logLevel))

logDir := os.Getenv("LOG_DIR")

if logDir == "" {
logDir = "logs"
}

MakeDir(logDir, 0755)

outputFile := logDir + "/" + name + ".log"

// make the logDir if it does not exist

cfg.OutputPaths = []string{outputFile}

cfg.EncoderConfig.TimeKey = "logTime"
cfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder

cfg.EncoderConfig.MessageKey = "message"

cfg.EncoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder

builtLogger, _ := cfg.Build(zap.AddCallerSkip(2), zap.AddStacktrace(zapcore.DebugLevel))

singleton = builtLogger.Sugar()
})
}

func parseLogLevel(logLevel string) zapcore.Level {
switch logLevel {
case "debug":
{
return zapcore.DebugLevel
}
case "info":
{
return zapcore.InfoLevel
}
case "error":
{
return zapcore.ErrorLevel
}
case "warn":
{
return zapcore.WarnLevel
}
default:
{
return zapcore.InfoLevel
}
}
}

func MakeDir(path string, mode os.FileMode) {
if _, err := os.Stat(path); os.IsNotExist(err) {
os.Mkdir(path, mode)
}
}

func Info(ctx context.Context, msg string, data Z) {
logIt(ctx, zapcore.InfoLevel, msg, data)
}

func Debug(ctx context.Context, msg string, data Z) {
logIt(ctx, zapcore.DebugLevel, msg, data)
}
func Error(ctx context.Context, msg string, data Z) {
logIt(ctx, zapcore.ErrorLevel, msg, data)
}

func logIt(ctx context.Context, level zapcore.Level, message string, data Z) {
if singleton == nil {
Init("", os.Getenv("LOG_LEVEL"))
}

if ctx == nil {
ctx = context.TODO()
}

modifiedArgs := ingestData(ctx, data)

switch level {
case zapcore.ErrorLevel:
{
singleton.Errorw(message, modifiedArgs...)
break
}
case zapcore.WarnLevel:
{
singleton.Warnw(message, modifiedArgs...)
break
}
case zapcore.InfoLevel:
{
singleton.Infow(message, modifiedArgs...)
break
}
case zapcore.DebugLevel:
{
singleton.Debugw(message, modifiedArgs...)
break
}
}
}

func ingestData(ctx context.Context, data Z) []interface{} {
if data == nil {
data = Z{}
}

data[constants.TRANSACTION_ID_KEY] = ctx.Value(constants.TRANSACTION_ID_KEY)

if data[constants.TRANSACTION_ID_KEY] == nil || data[constants.TRANSACTION_ID_KEY] == "" {
data[constants.TRANSACTION_ID_KEY] = uuid.New().String()
}

// organize Data in key, value, key, value... in an array of interface
argsLen := len(data) * 2 // key + value
args := make([]interface{}, argsLen)

i := 0
for k, v := range data {
args[i] = k
args[i+1] = v
i += 2
}

return args
}
10 changes: 8 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"context"
"fmt"
"log"
"net/http"
"os"
"zestream-server/configs"
"zestream-server/constants"
"zestream-server/logger"
"zestream-server/routes"
"zestream-server/service"
"zestream-server/utils"
Expand All @@ -20,13 +21,18 @@ func dev() {
func main() {
configs.LoadEnv()

logger.Init("zestream-server", os.Getenv("LOG_LEVEL"))

r := routes.Init()

port := os.Getenv(constants.PORT)

kafkaURI := os.Getenv("KAFKA_URI")
if kafkaURI == "" {
log.Fatal("Error: KAFKA_URI environment variable not set")
logger.Error(context.TODO(), "KAFKA_URI environment variable not set", logger.Z{
"kafka_uri": os.Getenv("KAFKA_URI"),
})
panic("KAFKA_URI environment variable not set")
}

if port == "" {
Expand Down
19 changes: 17 additions & 2 deletions routes/routes.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package routes

import (
"zestream-server/constants"
"zestream-server/controllers"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/gin-gonic/gin"
"zestream-server/constants"
"zestream-server/controllers"
"github.com/google/uuid"
)

// Init function will perform all route operations
Expand All @@ -27,6 +29,19 @@ func Init() *gin.Engine {
}
})

// middleware attaching a new transaction id for every request to context
// transaction id will help to track all logs releated to any request better
r.Use(func(ctx *gin.Context) {
transactionID := ctx.Request.Header.Get("transaction-id")

if transactionID == "" {
transactionID = uuid.New().String()
}

ctx.Set(constants.TRANSACTION_ID_KEY, transactionID)
ctx.Next()
})

// Create a new session
sess, err := session.NewSession(&aws.Config{
Region: aws.String(constants.S3_REGION),
Expand Down
Loading