From c619baf14b7be27ea1bd2a5c421514f3c5eb940d Mon Sep 17 00:00:00 2001 From: Hiep Date: Wed, 5 Aug 2020 00:26:08 +0800 Subject: [PATCH] Implement all repository functions and move away from mongoClient Part of #34 --- odin-engine/api/execute.go | 11 +- odin-engine/api/jobs.go | 131 +-- odin-engine/api/links.go | 26 +- odin-engine/api/main.go | 24 +- odin-engine/api/stats.go | 44 +- odin-engine/go.mod | 2 + odin-engine/go.sum | 4 + odin-engine/pkg/executor/jobNode.go | 17 +- odin-engine/pkg/executor/main.go | 18 +- odin-engine/pkg/executor/queue.go | 5 +- odin-engine/pkg/jobs/client.go | 104 ++ odin-engine/pkg/jobs/mongoClient.go | 316 ------- odin-engine/pkg/jobs/setup.go | 4 +- odin-engine/pkg/jobs/ticker.go | 25 +- odin-engine/pkg/jobs/ticker_test.go | 3 +- odin-engine/pkg/repository/nosql/nosql.go | 319 ++++++- .../pkg/repository/nosql/nosql_test.go | 891 +++++++++++++++++- odin-engine/pkg/repository/repository.go | 50 +- 18 files changed, 1495 insertions(+), 499 deletions(-) create mode 100644 odin-engine/pkg/jobs/client.go delete mode 100644 odin-engine/pkg/jobs/mongoClient.go diff --git a/odin-engine/api/execute.go b/odin-engine/api/execute.go index 402fc49..fbb4dc2 100644 --- a/odin-engine/api/execute.go +++ b/odin-engine/api/execute.go @@ -5,7 +5,8 @@ import ( "github.com/theycallmemac/odin/odin-engine/pkg/executor" "github.com/theycallmemac/odin/odin-engine/pkg/fsm" - "github.com/valyala/fasthttp" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" + "github.com/valyala/fasthttp" ) // ExecNode is a type to be used to unmarshal data into after a HTTP request @@ -16,15 +17,15 @@ type ExecNode struct { } // Executor is used to execute the item at the head of the job queue -func Executor(ctx *fasthttp.RequestCtx) { +func Executor(repo repository.Repository, ctx *fasthttp.RequestCtx) { var en ExecNode json.Unmarshal(ctx.PostBody(), &en) - go executor.Execute(en.Items, 0, HTTPAddr, en.Store) + go executor.Execute(repo, en.Items, 0, HTTPAddr, en.Store) } // ExecuteYaml is used to execute a job passed to the command line tool -func ExecuteYaml(ctx *fasthttp.RequestCtx) { +func ExecuteYaml(repo repository.Repository, ctx *fasthttp.RequestCtx) { var en ExecNode json.Unmarshal(ctx.PostBody(), &en) - go executor.Execute(en.Items, 1, HTTPAddr, en.Store) + go executor.Execute(repo, en.Items, 1, HTTPAddr, en.Store) } diff --git a/odin-engine/api/jobs.go b/odin-engine/api/jobs.go index 47d70a6..4911e17 100644 --- a/odin-engine/api/jobs.go +++ b/odin-engine/api/jobs.go @@ -2,124 +2,125 @@ package api import ( "bytes" - "fmt" + "fmt" "io/ioutil" "os" "strconv" "strings" "github.com/theycallmemac/odin/odin-engine/pkg/jobs" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" "github.com/theycallmemac/odin/odin-engine/pkg/resources" - "github.com/valyala/fasthttp" - - "go.mongodb.org/mongo-driver/bson" + "github.com/valyala/fasthttp" ) // AddJob is used to create a new job -func AddJob(ctx *fasthttp.RequestCtx) { - body := ctx.PostBody() +func AddJob(repo repository.Repository, ctx *fasthttp.RequestCtx) { + body := ctx.PostBody() path := jobs.SetupEnvironment(body) - client, err := jobs.SetupClient() + id, err := repo.CreateJob(ctx, body, path, "") if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + fmt.Fprintf(ctx, "[FAILED] Job failed to deploy: %v\n", err) } else { - status := jobs.InsertIntoMongo(client, body, path, "") - fmt.Fprintf(ctx, status) + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) deployed successfully\n", id) } } // DeleteJob is used to delete a job -func DeleteJob(ctx *fasthttp.RequestCtx) { +func DeleteJob(repo repository.Repository, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), " ") id, uid := args[0], args[1] - client, err := jobs.SetupClient() - if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + os.RemoveAll("/etc/odin/jobs/" + id) + os.RemoveAll("/etc/odin/logs/" + id) + if err := repo.DeleteJob(ctx, id, uid); err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to remove job (%s): %v\n", id, err) } else { - os.RemoveAll("/etc/odin/jobs/" + id) - os.RemoveAll("/etc/odin/logs/" + id) - if jobs.DeleteJobByValue(client, bson.M{"id": id}, uid) { - fmt.Fprintf(ctx, "Job removed!\n") - } else { - fmt.Fprintf(ctx, "Job with that ID does not exist!\n") - } + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) removed\n", id) } } // UpdateJob is used to update a job -func UpdateJob(ctx *fasthttp.RequestCtx) { +func UpdateJob(repo repository.Repository, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), "_") id, name, description, schedule, uid := args[0], args[1], args[2], args[3], args[4] - client, err := jobs.SetupClient() - if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + job := &repository.Job{ + ID: id, + UID: uid, + } + if resources.NotEmpty(name) { + job.Name = name + } + if resources.NotEmpty(description) { + job.Description = description + } + if resources.NotEmpty(schedule) { + ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654) + resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml"))) + os.Remove(".tmp.yml") + job.Schedule = resp + } + if err := repo.UpdateJob(ctx, job); err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err) } else { - job := jobs.GetJobByValue(client, bson.M{"id": id}, uid) - if resources.NotEmpty(name) { - job.Name = name - } - if resources.NotEmpty(description) { - job.Description = description - } - if resources.NotEmpty(schedule) { - ioutil.WriteFile(".tmp.yml", []byte("provider:\n name: 'odin'\n version: '1.0.0'\njob:\n name: ''\n description: ''\n language: ''\n file: ''\n schedule: "+schedule+"\n\n"), 0654) - resp := jobs.MakePostRequest("http://localhost:3939/schedule", bytes.NewBuffer([]byte(".tmp.yml"))) - os.Remove(".tmp.yml") - job.Schedule = resp - } - _ = jobs.UpdateJobByValue(client, job) - fmt.Fprintf(ctx, "Updated job " + id + " successfully\n") + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id) } } // GetJobDescription is used to show a job's description -func GetJobDescription(ctx *fasthttp.RequestCtx) { +func GetJobDescription(repo repository.Repository, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), "_") id, uid := args[0], args[1] - client, err := jobs.SetupClient() + job, err := repo.GetJobById(ctx, id, uid) if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + fmt.Fprintf(ctx, "[FAILED] Failed to get job (%s): %v", id, err) } else { - job := jobs.GetJobByValue(client, bson.M{"id": id}, uid) - fmt.Fprintf(ctx, job.Name + " - " + job.Description + "\n") + fmt.Fprintf(ctx, job.Name+" - "+job.Description+"\n") } } // UpdateJobRuns is used to update a job's run number -func UpdateJobRuns(ctx *fasthttp.RequestCtx) { +// TODO: Get and update job should be done as a transaction +func UpdateJobRuns(repo repository.Repository, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), " ") id, runs, uid := args[0], args[1], args[2] - client, err := jobs.SetupClient() + job, err := repo.GetJobById(ctx, id, uid) + if err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", id, err) + return + } + inc, err := strconv.Atoi(runs) if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") + fmt.Fprint(ctx, "Invalid run") + return + } + job.Runs = job.Runs + inc + if err := repo.UpdateJob(ctx, job); err != nil { + fmt.Fprintf(ctx, "[FAILED] Failed to update job (%s): %v\n", job.ID, err) } else { - job := jobs.GetJobByValue(client, bson.M{"id": id}, uid) - inc, _ := strconv.Atoi(runs) - job.Runs = job.Runs + inc - _ = jobs.UpdateJobByValue(client, job) + fmt.Fprintf(ctx, "[SUCCESS] Job (%s) updated\n", id) } } // ListJobs is used to list the current jobs running -func ListJobs(ctx *fasthttp.RequestCtx) { - client, err := jobs.SetupClient() +func ListJobs(repo repository.Repository, ctx *fasthttp.RequestCtx) { + uid := string(ctx.PostBody()) + jobList, err := repo.GetUserJobs(ctx, uid) if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") - } else { - jobList := jobs.GetUserJobs(client, string(ctx.PostBody())) - fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE")) - for _, job := range jobList { - linkLen := len(job.Links) - 1 - if linkLen < 0 { - linkLen = 0 - } - fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1])) + fmt.Fprintf(ctx, "[FAILED] Failed to get jobs for user %s\n", uid) + return + } + fmt.Fprintf(ctx, jobs.SchFormat("ID", "NAME", "DESCRIPTION", "LANGUAGE", "LINKS", "SCHEDULE")) + for _, job := range jobList { + linkLen := len(job.Links) - 1 + if linkLen < 0 { + linkLen = 0 } + fmt.Fprintf(ctx, jobs.SchFormat(job.ID, job.Name, job.Description, job.Language, job.Links[:linkLen], job.Schedule[:len(job.Schedule)-1])) } } // GetJobLogs is used to retrieve the logs for a job func GetJobLogs(ctx *fasthttp.RequestCtx) { log, _ := ioutil.ReadFile("/etc/odin/logs/" + string(ctx.PostBody())) - fmt.Fprintf(ctx, "\n" + string(log) + "\n") + fmt.Fprintf(ctx, "\n"+string(log)+"\n") } diff --git a/odin-engine/api/links.go b/odin-engine/api/links.go index 6ef356f..202d348 100644 --- a/odin-engine/api/links.go +++ b/odin-engine/api/links.go @@ -1,35 +1,31 @@ package api import ( - "fmt" + "fmt" "strings" - "github.com/theycallmemac/odin/odin-engine/pkg/jobs" - "github.com/valyala/fasthttp" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" + "github.com/valyala/fasthttp" ) // LinkJobs is used to link two jobs together -func LinkJobs(ctx *fasthttp.RequestCtx) { +func LinkJobs(repo repository.Repository, ctx *fasthttp.RequestCtx) { split := strings.Split(string(ctx.PostBody()), "_") from, to, uid := split[0], split[1], split[2] - client, _ := jobs.SetupClient() - updated := jobs.AddJobLink(client, from, to, uid) - if updated == 1 { - fmt.Fprintf(ctx, "Job " + from + " linked to " + to + "!\n") + if err := repo.AddJobLink(ctx, from, to, uid); err != nil { + fmt.Fprintf(ctx, "[FAILED] Job %s could not be linked to %s: %v\n", from, to, err) } else { - fmt.Fprintf(ctx, "Job " + from + " could not be linked to " + to + ".\n") + fmt.Fprintf(ctx, "[SUCCESS] Job %s linked to %s\n", from, to) } } // UnlinkJobs is used to delete a job link -func UnlinkJobs(ctx *fasthttp.RequestCtx) { +func UnlinkJobs(repo repository.Repository, ctx *fasthttp.RequestCtx) { split := strings.Split(string(ctx.PostBody()), "_") from, to, uid := split[0], split[1], split[2] - client, _ := jobs.SetupClient() - updated := jobs.DeleteJobLink(client, from, to, uid) - if updated == 1 { - fmt.Fprintf(ctx, "Job " + to + " unlinked from " + from + "!\n") + if err := repo.DeleteJobLink(ctx, from, to, uid); err != nil { + fmt.Fprintf(ctx, "[FAILED] Link %s could not be unlinked from %s: %v\n", to, from, err) } else { - fmt.Fprintf(ctx, "Job " + to + " has no links!\n") + fmt.Fprintf(ctx, "[SUCCESS] Link %s unlinked from %s\n", to, from) } } diff --git a/odin-engine/api/main.go b/odin-engine/api/main.go index a087460..8249d5f 100644 --- a/odin-engine/api/main.go +++ b/odin-engine/api/main.go @@ -56,38 +56,38 @@ func (service *Service) Start() { case "/cluster/leave": service.LeaveCluster(ctx) case "/execute": - Executor(ctx) + Executor(service.repo, ctx) case "/execute/yaml": - ExecuteYaml(ctx) + ExecuteYaml(service.repo, ctx) case "/jobs/add": - AddJob(ctx) + AddJob(service.repo, ctx) case "/jobs/delete": - DeleteJob(ctx) + DeleteJob(service.repo, ctx) case "/jobs/info/update": - UpdateJob(ctx) + UpdateJob(service.repo, ctx) case "/jobs/info/description": - GetJobDescription(ctx) + GetJobDescription(service.repo, ctx) case "/jobs/info/runs": - UpdateJobRuns(ctx) + UpdateJobRuns(service.repo, ctx) case "/jobs/list": - ListJobs(ctx) + ListJobs(service.repo, ctx) case "/jobs/logs": GetJobLogs(ctx) case "/links/add": - LinkJobs(ctx) + LinkJobs(service.repo, ctx) case "/links/delete": - UnlinkJobs(ctx) + UnlinkJobs(service.repo, ctx) case "/schedule": GetJobSchedule(ctx) case "/stats/add": - AddJobStats(ctx) + AddJobStats(service.repo, ctx) case "/stats/get": GetJobStats(service.repo, ctx) } } // start the countdown timer for the execution until the first job - go jobs.StartTicker(service.store, service.addr) + go jobs.StartTicker(service.repo, service.store, service.addr) HTTPAddr = service.addr fasthttp.ListenAndServe(HTTPAddr, routes) diff --git a/odin-engine/api/stats.go b/odin-engine/api/stats.go index 2c7c1b1..6277295 100644 --- a/odin-engine/api/stats.go +++ b/odin-engine/api/stats.go @@ -1,15 +1,12 @@ package api import ( - "context" "fmt" "strings" "github.com/theycallmemac/odin/odin-engine/pkg/jobs" "github.com/theycallmemac/odin/odin-engine/pkg/repository" "github.com/valyala/fasthttp" - - "go.mongodb.org/mongo-driver/mongo" ) // JobStats is a type to be used for accessing and storing job stats information @@ -22,45 +19,28 @@ type JobStats struct { } // AddJobStats is used to parse collected metrics -func AddJobStats(ctx *fasthttp.RequestCtx) { +func AddJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) { args := strings.Split(string(ctx.PostBody()), ",") typeOfValue, desc, value, id, timestamp := args[0], args[1], args[2], args[3], args[4] - client, err := jobs.SetupClient() - if err != nil { - fmt.Fprintf(ctx, "MongoDB cannot be accessed at the moment\n") - } else { - if InsertIntoMongo(client, typeOfValue, desc, value, id, timestamp) { - fmt.Fprintf(ctx, "200") - } else { - fmt.Fprintf(ctx, "500") - } + js := &repository.JobStats{ + ID: id, + Description: desc, + Type: typeOfValue, + Value: value, + Timestamp: timestamp, } -} - -// InsertIntoMongo is used to add collected metrics to the observability collection -// parameters: client (a *mongo.Client), typeOfValue (a string of the type of value being stored), desc (a string describing the value being stored), value (a string of the value being stored), id (a string of the associated Job ID), timestamp (a string of the unix time at which the operation took place) -// returns: bool (true is successful, false if otherwise) -func InsertIntoMongo(client *mongo.Client, typeOfValue string, desc string, value string, id string, timestamp string) bool { - var js JobStats - js.ID = id - js.Description = desc - js.Type = typeOfValue - js.Value = value - js.Timestamp = timestamp - collection := client.Database("odin").Collection("observability") - _, err := collection.InsertOne(context.TODO(), js) - client.Disconnect(context.TODO()) - if err != nil { - return false + if err := repo.CreateJobStats(ctx, js); err != nil { + fmt.Fprintf(ctx, "500") + } else { + fmt.Fprintf(ctx, "200") } - return true } // GetJobStats is used to show stats collected by a specified job func GetJobStats(repo repository.Repository, ctx *fasthttp.RequestCtx) { statsList, err := repo.GetJobStats(ctx, string(ctx.PostBody())) if err != nil { - fmt.Fprintf(ctx, "[ERROR] Cannot get job stats: %v", err) + fmt.Fprintf(ctx, "[FAILED] Cannot get job stats: %v\n", err) } else { for _, stat := range statsList { fmt.Fprintf(ctx, jobs.Format(stat.ID, stat.Description, stat.Type, stat.Value)) diff --git a/odin-engine/go.mod b/odin-engine/go.mod index 9f94482..5e768c4 100644 --- a/odin-engine/go.mod +++ b/odin-engine/go.mod @@ -2,6 +2,8 @@ module github.com/theycallmemac/odin/odin-engine go 1.13 +replace github.com/hidal-go/hidalgo => github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06 + require ( github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/hashicorp/raft v1.1.2 diff --git a/odin-engine/go.sum b/odin-engine/go.sum index a4af58a..f24845f 100644 --- a/odin-engine/go.sum +++ b/odin-engine/go.sum @@ -116,6 +116,10 @@ github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:hBE4LGxApbZiV/3YoEPv7uYlUMWOogG1hwtkpiU87zQ= github.com/hidal-go/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0= +github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa h1:PmwFwRyxsWbp6bGa+cNbfDdJm3cyz41ut83kblKc6o8= +github.com/hiepd/hidalgo v0.0.0-20190814174001-42e03f3b5eaa/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0= +github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06 h1:ZKIptNmc3Mfb8pNDMz3kM2dZAWliF53qZFno8XcqbRw= +github.com/hiepd/hidalgo v0.0.0-20200806161247-e4a84db22d06/go.mod h1:bPkrxDlroXxigw8BMWTEPTv4W5/rQwNgg2BECXsgyX0= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/odin-engine/pkg/executor/jobNode.go b/odin-engine/pkg/executor/jobNode.go index 7675ae1..7ef97ab 100644 --- a/odin-engine/pkg/executor/jobNode.go +++ b/odin-engine/pkg/executor/jobNode.go @@ -11,6 +11,7 @@ import ( "github.com/theycallmemac/odin/odin-engine/pkg/fsm" "github.com/theycallmemac/odin/odin-engine/pkg/jobs" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" "github.com/theycallmemac/odin/odin-engine/pkg/resources" ) @@ -32,12 +33,12 @@ func (job JobNode) logger(ch chan<- Data, data []byte, err error, store fsm.Stor logStatus = "[FAILED]" } t := time.Now() - f, _ := os.OpenFile("/etc/odin/logs/" + job.ID, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - _, err := f.WriteString(t.Format("Jan 2 15:04:05") + " " + logStatus + ": " + job.ID + " on " + store.ServerID + "\n") - if err != nil { - fmt.Println(err) - f.Close() - } + f, _ := os.OpenFile("/etc/odin/logs/"+job.ID, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + _, err := f.WriteString(t.Format("Jan 2 15:04:05") + " " + logStatus + ": " + job.ID + " on " + store.ServerID + "\n") + if err != nil { + fmt.Println(err) + f.Close() + } ch <- Data{ error: err, output: data, @@ -48,7 +49,7 @@ func (job JobNode) logger(ch chan<- Data, data []byte, err error, store fsm.Stor // runCommand is called on a JobNode type and is used to run a job like a shell would run a command // parameters: ch (channel used to return data), store (a store of node information) // returns: nil -func (job JobNode) runCommand(ch chan<- Data, httpAddr string, store fsm.Store) { +func (job JobNode) runCommand(repo repository.Repository, ch chan<- Data, httpAddr string, store fsm.Store) { URI := resources.UnmarsharlYaml(resources.ReadFileBytes(getHome() + "/odin-config.yml")).Mongo.Address os.Setenv("ODIN_EXEC_ENV", "True") os.Setenv("ODIN_MONGODB", URI) @@ -66,7 +67,7 @@ func (job JobNode) runCommand(ch chan<- Data, httpAddr string, store fsm.Store) } if job.Links != "" { links := strings.Split(job.Links, ",") - jobs.RunLinks(links, job.UID, httpAddr, store) + jobs.RunLinks(repo, links, job.UID, httpAddr, store) } job.logger(ch, data, err, store) } diff --git a/odin-engine/pkg/executor/main.go b/odin-engine/pkg/executor/main.go index 2e07983..3f1aaad 100644 --- a/odin-engine/pkg/executor/main.go +++ b/odin-engine/pkg/executor/main.go @@ -11,13 +11,13 @@ import ( "strings" "github.com/theycallmemac/odin/odin-engine/pkg/fsm" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" "github.com/theycallmemac/odin/odin-engine/pkg/resources" ) // Queue is a type used to contain an array of JobNode's type Queue []JobNode - // JobNode is a type used to define a job as a node in a Queue type JobNode struct { ID string @@ -30,7 +30,7 @@ type JobNode struct { Links string } -// Data is a type used to define channels for execution +// Data is a type used to define channels for execution type Data struct { output []byte error error @@ -54,7 +54,7 @@ func makePutRequest(link string, data *bytes.Buffer) string { // executeYaml is used to run a job like straight from the command line tool // parameters: filename (a string containing the path to the local file to execute), done (a boolean channel), store (a store of node information) // returns: nil -func executeYaml(filename string, done chan bool, httpAddr string, store fsm.Store) { +func executeYaml(repo repository.Repository, filename string, done chan bool, httpAddr string, store fsm.Store) { if exists(filename) { var job JobNode singleChannel := make(chan Data) @@ -67,7 +67,7 @@ func executeYaml(filename string, done chan bool, httpAddr string, store fsm.Sto gid, _ := strconv.Atoi(group.Gid) job.UID = uint32(uid) job.GID = uint32(gid) - go job.runCommand(singleChannel, httpAddr, store) + go job.runCommand(repo, singleChannel, httpAddr, store) res := <-singleChannel ReviewError(res.error, "bool") done <- true @@ -80,10 +80,10 @@ func executeYaml(filename string, done chan bool, httpAddr string, store fsm.Sto // executeLang is used to execute a file in /etc/odin/$id // parameters: contentsJSON (byte array containing uid, gid, language and file information), store (a store of node information) // returns: boolean (returns true if the file exists and is executed) -func executeLang(contentsJSON []byte, done chan bool, httpAddr string, store fsm.Store) { +func executeLang(repo repository.Repository, contentsJSON []byte, done chan bool, httpAddr string, store fsm.Store) { var queue Queue json.Unmarshal(contentsJSON, &queue) - go queue.BatchRun(httpAddr, store) + go queue.BatchRun(repo, httpAddr, store) go queue.UpdateRuns(httpAddr) done <- true return @@ -92,13 +92,13 @@ func executeLang(contentsJSON []byte, done chan bool, httpAddr string, store fsm // Execute is used to decide which of the executeLang and exectureYaml functions to use // parameters: contents (byte array containing uid, gid, language and file information), process (int used to decide the function to use in the code), httpAddr (an address string for the engine), store (a store of node information) // returns: boolean (returns true if one of the functions executes successfully, false otherwise) -func Execute(contents []byte, process int, httpAddr string, store fsm.Store) bool { +func Execute(repo repository.Repository, contents []byte, process int, httpAddr string, store fsm.Store) bool { done := make(chan bool) switch process { case 0: - go executeLang(contents, done, httpAddr, store) + go executeLang(repo, contents, done, httpAddr, store) case 1: - go executeYaml(string(contents), done, httpAddr, store) + go executeYaml(repo, string(contents), done, httpAddr, store) } return <-done } diff --git a/odin-engine/pkg/executor/queue.go b/odin-engine/pkg/executor/queue.go index 1852207..655957f 100644 --- a/odin-engine/pkg/executor/queue.go +++ b/odin-engine/pkg/executor/queue.go @@ -6,16 +6,17 @@ import ( "strconv" "github.com/theycallmemac/odin/odin-engine/pkg/fsm" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" ) // BatchRun is called on a queue type and is used to run the batch loop to run all executions // parameters: store (a store of node information) // returns: nil -func (queue Queue) BatchRun(httpAddr string, store fsm.Store) { +func (queue Queue) BatchRun(repo repository.Repository, httpAddr string, store fsm.Store) { for _, job := range queue { go func(job JobNode) { channel := make(chan Data) - go job.runCommand(channel, httpAddr, store) + go job.runCommand(repo, channel, httpAddr, store) }(job) } } diff --git a/odin-engine/pkg/jobs/client.go b/odin-engine/pkg/jobs/client.go new file mode 100644 index 0000000..5e5cb35 --- /dev/null +++ b/odin-engine/pkg/jobs/client.go @@ -0,0 +1,104 @@ +package jobs + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "os/user" + "strconv" + "strings" + + "github.com/lnquy/cron" + "github.com/theycallmemac/odin/odin-engine/pkg/fsm" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" + "github.com/theycallmemac/odin/odin-engine/pkg/resources" + "github.com/theycallmemac/odin/odin-engine/pkg/types" + + "gopkg.in/yaml.v2" +) + +// URI is used to store the address to the MongoDB instance used by the Odin Engine +var URI = resources.UnmarsharlYaml(resources.ReadFileBytes(getHome() + "/odin-config.yml")).Mongo.Address + +// getHome is used to get the path to the user's home directory +// parameters: nil +// return string (the path to the user's home) +func getHome() string { + usr, _ := user.Current() + return usr.HomeDir +} + +// unmarsharlYaml is used to unmarshal YAML +// parameters: byteArray (an array of bytes representing the contents of a file) +// returns: Config (a struct form of the YAML) +func unmarsharlYaml(byteArray []byte) types.EngineConfig { + var cfg types.EngineConfig + err := yaml.Unmarshal([]byte(byteArray), &cfg) + if err != nil { + log.Fatalf("error: %v", err) + } + return cfg +} + +// Format is used to format the output of MongoDB stat contents +// parameters: id, description, valType, value (four strings corresponding to individual job stats) +// returns: string (a space formatted string used for display) +func Format(id string, description string, valType string, value string) string { + return fmt.Sprintf("%-20s%-20s%-20s%-20s\n", id, description, valType, value) +} + +// SchFormat is used to parse and format the output of the MongoDB schedule contents +// parameters: id, name, description, schedule (four strings corresponding to individual job data) +// returns: string (a space formatted string used for display) +func SchFormat(id string, name, string, description string, links string, schedule string) string { + var finalSchedule = "" + var tmpSchedule = "" + if schedule == "0 5 31 2 *" { + finalSchedule = "never" + } else if schedule != "SCHEDULE" { + scheduleArray := strings.Split(schedule, ",") + for i, item := range scheduleArray { + descriptor, _ := cron.NewDescriptor() + tmpSchedule, _ = descriptor.ToDescription(item, cron.Locale_en) + if i+1 == len(scheduleArray) { + finalSchedule += tmpSchedule + } else { + finalSchedule += tmpSchedule + " & " + } + } + } else { + finalSchedule = schedule + } + return fmt.Sprintf("%-20s%-20s%-20s%-20s%-20s\n", id, name, description, links, finalSchedule) +} + +// RunLinks is used to run jobs linked to a job which has just been executed +// parameters: links (a string array of Job ID's to execute), uid (a uint32 of that user's id), httpAddr (a string port of the master node), store (a fsm.Store containing information about other nodes) +// returns: nil +func RunLinks(repo repository.Repository, links []string, uid uint32, httpAddr string, store fsm.Store) { + ctx := context.Background() + var jobs []Node + var node Node + for _, link := range links { + id := string(link) + job, err := repo.GetJobById(ctx, id, fmt.Sprint(uid)) + if err != nil { + fmt.Printf("[FAILED] Failed to get job (%s) (%d) while running links", id, uid) + continue + } + node.ID, node.Lang, node.File, node.Links = job.ID, job.Language, job.File, job.Links + uid, _ := strconv.ParseUint(job.UID, 10, 32) + gid, _ := strconv.ParseUint(job.GID, 10, 32) + node.UID = uint32(uid) + node.GID = uint32(gid) + jobs = append(jobs, node) + } + var en ExecNode + jobsArray, _ := json.Marshal(jobs) + en.Items = jobsArray + en.Store = store + buffer, _ := json.Marshal(en) + go MakePostRequest("http://localhost"+httpAddr+"/execute", bytes.NewBuffer(buffer)) +} diff --git a/odin-engine/pkg/jobs/mongoClient.go b/odin-engine/pkg/jobs/mongoClient.go deleted file mode 100644 index a8a75fa..0000000 --- a/odin-engine/pkg/jobs/mongoClient.go +++ /dev/null @@ -1,316 +0,0 @@ -package jobs - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "log" - "os/user" - "strconv" - "strings" - "time" - - "github.com/lnquy/cron" - "github.com/theycallmemac/odin/odin-engine/pkg/fsm" - "github.com/theycallmemac/odin/odin-engine/pkg/resources" - "github.com/theycallmemac/odin/odin-engine/pkg/types" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/mongo/readpref" - - "gopkg.in/yaml.v2" -) - -// URI is used to store the address to the MongoDB instance used by the Odin Engine -var URI = resources.UnmarsharlYaml(resources.ReadFileBytes(getHome() + "/odin-config.yml")).Mongo.Address - -// NewJob is a type to be used for accessing and storing job information -type NewJob struct { - ID string `yaml:"id"` - UID string `yaml:"uid"` - GID string `yaml:"gid"` - Name string `yaml:"name"` - Description string `yaml:"description"` - Language string `yaml:"language"` - File string `yaml:"file"` - Stats string `yaml:"stats"` - Schedule string `yaml:"schedule"` - Runs int - Links string -} - -// JobStats is a type to be used for accessing and storing job stats information -type JobStats struct { - ID string - Description string - Type string - Value string -} - - -// getHome is used to get the path to the user's home directory -// parameters: nil -// return string (the path to the user's home) -func getHome() string { - usr, _ := user.Current() - return usr.HomeDir -} - -// unmarsharlYaml is used to unmarshal YAML -// parameters: byteArray (an array of bytes representing the contents of a file) -// returns: Config (a struct form of the YAML) -func unmarsharlYaml(byteArray []byte) types.EngineConfig { - var cfg types.EngineConfig - err := yaml.Unmarshal([]byte(byteArray), &cfg) - if err != nil { - log.Fatalf("error: %v", err) - } - return cfg -} - -// SetupClient is used to set up a MongoDB client and test it with a ping command -// parameters: nil -// returns: *mogno.Client (a client) -func SetupClient() (*mongo.Client, error) { - c := getMongoClient() - ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) - err := c.Ping(ctx, readpref.Primary()) - if err != nil { - fmt.Println("Cannot connect to MongoDB: ", err) - c.Disconnect(context.TODO()) - } - return c, err -} - -// getMongoClient is used to get a MongoDB Client and set it's options -// parameters: none -// returns: *mogno.Client (a client) -func getMongoClient() *mongo.Client { - clientOptions := options.Client().ApplyURI(URI) - client, err := mongo.NewClient(clientOptions) - if err != nil { - log.Fatal(err) - } - err = client.Connect(context.Background()) - if err != nil { - log.Fatal(err) - } - return client -} - -// InsertIntoMongo is used to add information to the MongoDB instance -// parameters: client (a *mongo.Client), d (a byte array containing marshaled JSON), and path (a string to set as the new job.File) -// returns: interface{} (an interface on the insertion results) -func InsertIntoMongo(client *mongo.Client, d []byte, path string, uid string) string { - var job NewJob - json.Unmarshal(d, &job) - job.File = path - job.Runs = 0 - if string(GetJobByValue(client, bson.M{"id": string(job.ID)}, uid).ID) == string(job.ID) { - return "Job with ID: " + job.ID + " already exists\n" - } - collection := client.Database("odin").Collection("jobs") - _, err := collection.InsertOne(context.TODO(), job) - client.Disconnect(context.TODO()) - if err != nil { - log.Fatalln("Error on inserting new job", err) - } - return "Job: " + job.ID + " deployed successfully\n" -} - -// GetJobStats is used to retrieve the stats associated with each job from the MongoDB instance -// parameters: client (a *mongo.Client), id (a string representation of a job's id) -// returns: []JobStats (the collection of fetched job stats) -func GetJobStats(client *mongo.Client, id string) []JobStats { - var statMap map[string]string - var jobStats JobStats - var statsList []JobStats - collection := client.Database("odin").Collection("observability") - documents, _ := collection.Find(context.TODO(), bson.M{"id": id}) - client.Disconnect(context.TODO()) - for documents.Next(context.TODO()) { - documents.Decode(&statMap) - jobStats.ID = statMap["id"] - jobStats.Description = statMap["desc"] - jobStats.Type = statMap["type"] - jobStats.Value = statMap["value"] - statsList = append(statsList, jobStats) - } - return statsList -} - -// GetJobByValue is used to return a job in MongoDB by filtering on a certain value pertaining to that job -// parameters: client (a *mongo.Client), filter (a bson encoding of a job id), uid (a string of the user's ID) -// returns: NewJob (the fetched job) -func GetJobByValue(client *mongo.Client, filter bson.M, uid string) NewJob { - var job NewJob - collection := client.Database("odin").Collection("jobs") - documentReturned := collection.FindOne(context.TODO(), filter) - documentReturned.Decode(&job) - if job.UID == uid { - return job - } - var tmp NewJob - return tmp -} - -// GetUserJobs is used to return a specific user's jobs from MongoDB -// parameters: client (a *mongo.Client), uid (a string of that user's id) -// returns: []NewJob (all jobs in the Mongo instance) -func GetUserJobs(client *mongo.Client, uid string) []NewJob { - var jobs []NewJob - collection := client.Database("odin").Collection("jobs") - documents, _ := collection.Find(context.TODO(), bson.D{}) - client.Disconnect(context.TODO()) - for documents.Next(context.TODO()) { - var job NewJob - documents.Decode(&job) - if job.UID == uid || uid == "0" { - jobs = append(jobs, job) - } - } - return jobs -} - -// GetAll is used to return all jobs in MongoDB -// parameters: client (a *mongo.Client) -// returns: []NewJob (all jobs in the Mongo instance) -func GetAll(client *mongo.Client) []NewJob { - var jobs []NewJob - collection := client.Database("odin").Collection("jobs") - documents, _ := collection.Find(context.TODO(), bson.D{}) - client.Disconnect(context.TODO()) - for documents.Next(context.TODO()) { - var job NewJob - documents.Decode(&job) - jobs = append(jobs, job) - } - return jobs -} - -// Format is used to format the output of MongoDB stat contents -// parameters: id, description, valType, value (four strings corresponding to individual job stats) -// returns: string (a space formatted string used for display) -func Format(id string, description string, valType string, value string) string { - return fmt.Sprintf("%-20s%-20s%-20s%-20s\n", id, description, valType, value) -} - -// SchFormat is used to parse and format the output of the MongoDB schedule contents -// parameters: id, name, description, schedule (four strings corresponding to individual job data) -// returns: string (a space formatted string used for display) -func SchFormat(id string, name, string, description string, links string, schedule string) string { - var finalSchedule = "" - var tmpSchedule = "" - if schedule == "0 5 31 2 *" { - finalSchedule = "never" - } else if schedule != "SCHEDULE" { - scheduleArray := strings.Split(schedule, ",") - for i, item := range scheduleArray { - descriptor, _ := cron.NewDescriptor() - tmpSchedule, _ = descriptor.ToDescription(item, cron.Locale_en) - if i+1 == len(scheduleArray) { - finalSchedule += tmpSchedule - } else { - finalSchedule += tmpSchedule + " & " - } - } - } else { - finalSchedule = schedule - } - return fmt.Sprintf("%-20s%-20s%-20s%-20s%-20s\n", id, name, description, links, finalSchedule) -} - -// UpdateJobByValue is used to modify a job in MongoDB -// parameters: client (a *mongo.Client), job (a NewJob structure) -// returns: int64 (value of the number of entries modified) -func UpdateJobByValue(client *mongo.Client, job NewJob) int64 { - update := bson.M{"$set": bson.M{"name": job.Name, "description": job.Description, "schedule": job.Schedule, "runs": job.Runs}} - collection := client.Database("odin").Collection("jobs") - updateResult, err := collection.UpdateOne(context.TODO(), bson.M{"id": job.ID}, update) - client.Disconnect(context.TODO()) - if err != nil { - return int64(0) - } - return updateResult.ModifiedCount -} - -// DeleteJobByValue is used to delete a job in MongoDB -// parameters: parameters: client (a *mongo.Client), filter (a bson encoding of a job id), uid (a string of the user's ID) -// returns: bool (whether a job was deleted or not) -func DeleteJobByValue(client *mongo.Client, filter bson.M, uid string) bool { - job := GetJobByValue(client, filter, uid) - if job.ID == "" || job.UID != uid { - return false - } - collection := client.Database("odin").Collection("jobs") - _, err := collection.DeleteOne(context.TODO(), filter) - client.Disconnect(context.TODO()) - if err != nil { - return false - } - return true -} - -// AddJobLink is used to add links the job is associated with -// parameters: client (a *mongo.Client), from (a string of a job ID to give a new link), to (a string of a job ID to create a link to), uid (a string of the user's ID) -// returns: int64 (value of the number of entries modified) -func AddJobLink(client *mongo.Client, from string, to string, uid string) int64 { - job := GetJobByValue(client, bson.M{"id": string(from)}, uid) - if strings.Contains(job.Links, to) { - return 0 - } - job.Links = job.Links + to + "," - update := bson.M{"$set": bson.M{"links": job.Links}} - collection := client.Database("odin").Collection("jobs") - updateResult, _ := collection.UpdateOne(context.TODO(), bson.M{"id": from}, update) - client.Disconnect(context.TODO()) - return updateResult.ModifiedCount -} - -// DeleteJobLink is used to delete links the job is associated with -// parameters: client (a *mongo.Client), from (a string of a job ID to remove a link from), to (a string of a job ID to remove), uid (a string of the user's ID) -// returns: int64 (value of the number of entries modified) -func DeleteJobLink(client *mongo.Client, from string, to string, uid string) int64 { - var newLinks string - job := GetJobByValue(client, bson.M{"id": string(from)}, uid) - links := strings.Split(job.Links, ",") - for _, link := range links { - if link != to && link != "" { - newLinks = newLinks + link + "," - } - } - update := bson.M{"$set": bson.M{"links": newLinks}} - collection := client.Database("odin").Collection("jobs") - updateResult, _ := collection.UpdateOne(context.TODO(), bson.M{"id": job.ID}, update) - client.Disconnect(context.TODO()) - return updateResult.ModifiedCount -} - - -// RunLinks is used to run jobs linked to a job which has just been executed -// parameters: links (a string array of Job ID's to execute), uid (a uint32 of that user's id), httpAddr (a string port of the master node), store (a fsm.Store containing information about other nodes) -// returns: nil -func RunLinks(links []string, uid uint32, httpAddr string, store fsm.Store) { - client, _ := SetupClient() - var jobs []Node - var node Node - for _, link := range links { - job := GetJobByValue(client, bson.M{"id": string(link)}, fmt.Sprint(uid)) - node.ID, node.Lang, node.File, node.Links = job.ID, job.Language, job.File, job.Links - uid, _ := strconv.ParseUint(job.UID, 10, 32) - gid, _ := strconv.ParseUint(job.GID, 10, 32) - node.UID = uint32(uid) - node.GID = uint32(gid) - jobs = append(jobs, node) - } - client.Disconnect(context.TODO()) - var en ExecNode - jobsArray, _ := json.Marshal(jobs) - en.Items = jobsArray - en.Store = store - buffer, _ := json.Marshal(en) - go MakePostRequest("http://localhost"+httpAddr+"/execute", bytes.NewBuffer(buffer)) -} diff --git a/odin-engine/pkg/jobs/setup.go b/odin-engine/pkg/jobs/setup.go index b7b1c53..f4b9bf8 100644 --- a/odin-engine/pkg/jobs/setup.go +++ b/odin-engine/pkg/jobs/setup.go @@ -7,9 +7,11 @@ import ( "os/user" "strconv" "strings" + + "github.com/theycallmemac/odin/odin-engine/pkg/repository" ) -var job NewJob +var job repository.Job // GID is used to store the group id associated with a specific group var GID int diff --git a/odin-engine/pkg/jobs/ticker.go b/odin-engine/pkg/jobs/ticker.go index 0b9b8cb..420d797 100644 --- a/odin-engine/pkg/jobs/ticker.go +++ b/odin-engine/pkg/jobs/ticker.go @@ -2,6 +2,7 @@ package jobs import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -14,6 +15,7 @@ import ( "github.com/gorhill/cronexpr" "github.com/theycallmemac/odin/odin-engine/pkg/fsm" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" ) // Queue is a type used to access an array of Nodes @@ -40,7 +42,7 @@ type ExecNode struct { // storage is a var used store the last known Queue var storage struct { - Items []NewJob + Items []repository.Job } // dead is a var used to check MongoDB @@ -149,7 +151,7 @@ func cronToSeconds(cronTime string) []int { // fillQueue is used to fill the queue, calling sorting and grouping methods before checking the head// parameters: t (the time interval between each execution of the fillQueue function) // parameters: jobs (an unsorted array of Jobs), httpAddr (an address string from the server), store (a fsm containing node information) // returns: []Node the sorted array of jobs -func fillQueue(jobs []NewJob, httpAddr string, store fsm.Store) []Node { +func fillQueue(jobs []repository.Job, httpAddr string, store fsm.Store) []Node { var queue Queue var node Node for count, j := range jobs { @@ -176,16 +178,15 @@ func fillQueue(jobs []NewJob, httpAddr string, store fsm.Store) []Node { // startQueuing is used to start the queueing process // parameters: t (the time interval between each execution of the fillQueue function), httpAddr (an address string from the server), store (a fsm containing node information) // returns: nil -func startQueuing(t time.Time, httpAddr string, store fsm.Store) { +func startQueuing(repo repository.Repository, t time.Time, httpAddr string, store fsm.Store) { if dead { fillQueue(storage.Items, httpAddr, store) } else { - client, err := SetupClient() + jobs, err := repo.GetAll(context.Background()) if err != nil { dead = true } else { dead = false - jobs := GetAll(client) fillQueue(jobs, httpAddr, store) storage.Items = jobs } @@ -195,15 +196,21 @@ func startQueuing(t time.Time, httpAddr string, store fsm.Store) { // doEvery is used to execute the fillQueue function every second // parameters: d (the duration between execution of fillQueue), f (the function to execute - in this case it's startingQueue), store (a fsm containing node information), httpAddr (an address string from the server) // returns: nil -func doEvery(d time.Duration, f func(time.Time, string, fsm.Store), store fsm.Store, httpAddr string) { +func doEvery( + d time.Duration, + f func(repository.Repository, time.Time, string, fsm.Store), + repo repository.Repository, + store fsm.Store, + httpAddr string) { + for x := range time.Tick(d) { - go f(x, httpAddr, store) + go f(repo, x, httpAddr, store) } } // StartTicker starts the countdown process, specifying the parameters of execution for doEvery // parameters: store (a fsm containing node information), httpAddr (an address string from the server) // returns: nil -func StartTicker(store fsm.Store, httpAddr string) { - go doEvery(1000*time.Millisecond, startQueuing, store, httpAddr) +func StartTicker(repo repository.Repository, store fsm.Store, httpAddr string) { + go doEvery(1000*time.Millisecond, startQueuing, repo, store, httpAddr) } diff --git a/odin-engine/pkg/jobs/ticker_test.go b/odin-engine/pkg/jobs/ticker_test.go index bd244ca..795780a 100644 --- a/odin-engine/pkg/jobs/ticker_test.go +++ b/odin-engine/pkg/jobs/ticker_test.go @@ -7,11 +7,12 @@ import ( "time" "github.com/theycallmemac/odin/odin-engine/pkg/fsm" + "github.com/theycallmemac/odin/odin-engine/pkg/repository" ) var unsorted Queue var sorted Queue -var jobs []NewJob +var jobs []repository.Job func TestSortQueue(t *testing.T) { node1 := Node{Schedule: []int{80}} diff --git a/odin-engine/pkg/repository/nosql/nosql.go b/odin-engine/pkg/repository/nosql/nosql.go index 034de41..f6e4dcd 100644 --- a/odin-engine/pkg/repository/nosql/nosql.go +++ b/odin-engine/pkg/repository/nosql/nosql.go @@ -2,6 +2,10 @@ package nosql import ( "context" + "encoding/json" + "errors" + "fmt" + "strings" "github.com/hidal-go/hidalgo/legacy/nosql/mongo" @@ -27,30 +31,188 @@ func init() { }) } +var ( + ErrInvalidArgument = errors.New("Invalid argument") +) + +// Repository is implementation of Repository interface type Repository struct { db nosql.Database } func (repo *Repository) ensureIndex(ctx context.Context) error { - return repo.db.EnsureIndex( + if err := repo.db.EnsureIndex( ctx, base.ObservabilityTable, nosql.Index{ - Fields: []string{"_id"}, + Fields: []string{"id"}, + Type: nosql.StringExact, + }, + nil, + ); err != nil { + return err + } + + if err := repo.db.EnsureIndex( + ctx, + base.JobTable, + nosql.Index{ + Fields: []string{"id"}, Type: nosql.StringExact, }, - []nosql.Index{ - { - Fields: []string{"id"}, - Type: nosql.StringExact, - }, + nil, + ); err != nil { + return err + } + + return nil +} + +// CreateJob creates a new job for an user +func (repo *Repository) CreateJob(ctx context.Context, data []byte, path string, uid string) (string, error) { + job := &base.Job{} + if err := json.Unmarshal(data, job); err != nil { + return "", err + } + if job.ID == "" { + return "", ErrInvalidArgument + } else if _, err := repo.GetJobById(ctx, job.ID, uid); err == nil { + return "", fmt.Errorf("job with id %s exists", job.ID) + } else if err != nosql.ErrNotFound { + return "", err + } + job.File = path + job.Runs = 0 + doc := marshalJob(job) + _, err := repo.db.Insert(ctx, base.JobTable, []string{job.ID}, doc) + return job.ID, err +} + +// GetJobById returns a job by filtering on a certain value pertaining to that job +func (repo *Repository) GetJobById(ctx context.Context, id string, uid string) (*base.Job, error) { + if id == "" { + return nil, ErrInvalidArgument + } + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(id), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).One(ctx) + + if err != nil { + return nil, err + } + + job := &base.Job{} + unmarshalJob(doc, job) + return job, nil +} + +// GetUserJobs returns all jobs belonging to an user +func (repo *Repository) GetUserJobs(ctx context.Context, uid string) ([]*base.Job, error) { + if uid == "" { + return nil, ErrInvalidArgument + } + iter := repo.db.Query(base.JobTable).WithFields(nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }).Iterate() + + if iter.Err() != nil { + return nil, iter.Err() + } + defer iter.Close() + + results := make([]*base.Job, 0) + for iter.Next(ctx) { + doc := iter.Doc() + job := &base.Job{} + unmarshalJob(doc, job) + results = append(results, job) + } + return results, nil +} + +// GetAll returns all jobs +func (repo *Repository) GetAll(ctx context.Context) ([]base.Job, error) { + iter := repo.db.Query(base.JobTable).Iterate() + if iter.Err() != nil { + return nil, iter.Err() + } + defer iter.Close() + + results := make([]base.Job, 0) + for iter.Next(ctx) { + doc := iter.Doc() + job := base.Job{} + unmarshalJob(doc, &job) + results = append(results, job) + } + return results, nil +} + +// UpdateJob modifies a job +func (repo *Repository) UpdateJob(ctx context.Context, job *base.Job) error { + if job.ID == "" { + return ErrInvalidArgument + } + key := []string{job.ID} + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(job.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(job.UID), }, - ) + ).One(ctx) + if err != nil { + return err + } + doc["name"] = nosql.String(job.Name) + doc["description"] = nosql.String(job.Description) + doc["schedule"] = nosql.String(job.Schedule) + doc["runs"] = nosql.Int(job.Runs) + return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx) } -func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobStats, error) { +// DeleteJob deletes an user's job +func (repo *Repository) DeleteJob(ctx context.Context, id string, uid string) error { + if id == "" { + return ErrInvalidArgument + } + return repo.db.Delete(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(id), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).Do(ctx) +} + +// GetJobStats returns stats of a job given the job id +func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]*base.JobStats, error) { + if id == "" { + return nil, ErrInvalidArgument + } iter := repo.db.Query(base.ObservabilityTable).WithFields(nosql.FieldFilter{ - Path: []string{"id"}, + Path: []string{"_id"}, Filter: nosql.Equal, Value: nosql.String(id), }).Iterate() @@ -60,20 +222,143 @@ func (repo *Repository) GetJobStats(ctx context.Context, id string) ([]base.JobS } defer iter.Close() - results := make([]base.JobStats, 0) + results := make([]*base.JobStats, 0) for iter.Next(ctx) { doc := iter.Doc() - jobStats := base.JobStats{ - ID: string(doc["id"].(nosql.String)), - Description: string(doc["desc"].(nosql.String)), - Type: string(doc["type"].(nosql.String)), - Value: string(doc["value"].(nosql.String)), - } + jobStats := &base.JobStats{} + unmarshalJobStats(doc, jobStats) results = append(results, jobStats) } return results, nil } +// CreateJobStats creates a new job for an user +func (repo *Repository) CreateJobStats(ctx context.Context, js *base.JobStats) error { + if js.ID == "" { + return ErrInvalidArgument + } else if _, err := repo.GetJobStats(ctx, js.ID); err == nil { + return fmt.Errorf("job stats with id %s exists", js.ID) + } else if err != nosql.ErrNotFound { + return err + } + doc := marshalJobStats(js) + _, err := repo.db.Insert(ctx, base.JobTable, []string{js.ID}, doc) + return err +} + +// AddJobLink is used to add links the job is associated with +func (repo *Repository) AddJobLink(ctx context.Context, from string, to string, uid string) error { + if from == "" || to == "" { + return ErrInvalidArgument + } + key := []string{from} + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(from), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).One(ctx) + if err != nil { + return err + } + doc["links"] = nosql.String(string(doc["links"].(nosql.String)) + to + ",") + return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx) +} + +// DeleteJobLink is used to delete links the job is associated with +func (repo *Repository) DeleteJobLink(ctx context.Context, from string, to string, uid string) error { + if from == "" || to == "" { + return ErrInvalidArgument + } + key := []string{from} + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(from), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(uid), + }, + ).One(ctx) + if err != nil { + return err + } + links := strings.Split(string(doc["links"].(nosql.String)), ",") + newLinks := "" + for _, link := range links { + if link != to && link != "" { + newLinks = newLinks + link + "," + } + } + doc["links"] = nosql.String(newLinks) + return repo.db.Update(base.JobTable, key).Upsert(doc).Do(ctx) +} + +// Close closes db connection func (repo *Repository) Close() error { return repo.db.Close() } + +func unmarshalJob(doc nosql.Document, job *base.Job) { + job.ID = valueToString(doc["id"]) + job.UID = valueToString(doc["uid"]) + job.GID = valueToString(doc["gid"]) + job.Name = valueToString(doc["name"]) + job.Description = valueToString(doc["description"]) + job.Language = valueToString(doc["language"]) + job.File = valueToString(doc["file"]) + job.Stats = valueToString(doc["stats"]) + job.Schedule = valueToString(doc["schedule"]) + job.Runs = int(doc["runs"].(nosql.Int)) + job.Links = valueToString(doc["links"]) +} + +func marshalJob(job *base.Job) nosql.Document { + return nosql.Document{ + "id": nosql.String(job.ID), + "uid": nosql.String(job.UID), + "gid": nosql.String(job.GID), + "name": nosql.String(job.Name), + "description": nosql.String(job.Description), + "language": nosql.String(job.Language), + "file": nosql.String(job.File), + "stats": nosql.String(job.Stats), + "schedule": nosql.String(job.Schedule), + "runs": nosql.Int(job.Runs), + "links": nosql.String(job.Links), + } +} + +func unmarshalJobStats(doc nosql.Document, js *base.JobStats) { + js.ID = valueToString(doc["id"]) + js.Description = valueToString(doc["desc"]) + js.Type = valueToString(doc["type"]) + js.Value = valueToString(doc["value"]) + js.Timestamp = valueToString(doc["timestamp"]) +} + +func marshalJobStats(js *base.JobStats) nosql.Document { + return nosql.Document{ + "id": nosql.String(js.ID), + "desc": nosql.String(js.Description), + "type": nosql.String(js.Type), + "value": nosql.String(js.Value), + "timestamp": nosql.String(js.Timestamp), + } +} + +func valueToString(val nosql.Value) string { + if val == nil { + return "" + } + return string(val.(nosql.String)) +} diff --git a/odin-engine/pkg/repository/nosql/nosql_test.go b/odin-engine/pkg/repository/nosql/nosql_test.go index 01dc560..fa1e06d 100644 --- a/odin-engine/pkg/repository/nosql/nosql_test.go +++ b/odin-engine/pkg/repository/nosql/nosql_test.go @@ -2,6 +2,7 @@ package nosql import ( "context" + "errors" "testing" "github.com/hidal-go/hidalgo/legacy/nosql" @@ -17,7 +18,7 @@ import ( // use mongo as default nosql test db func testRepo(t *testing.T) *Repository { // TODO: pass mongo url as env variable instead - db, err := mongo.Dial("localhost:27017", base.DefaultDatabase, nosql.Options(nil)) + db, err := mongo.Dial("localhost:27017", base.DefaultDatabase, nil) require.NoError(t, err) return &Repository{ db: db, @@ -32,9 +33,887 @@ func cleanUp(t *testing.T, ctx context.Context) { client.Connect(ctx) err = client.Database(base.DefaultDatabase).Collection(base.ObservabilityTable).Drop(ctx) require.NoError(t, err) + err = client.Database(base.DefaultDatabase).Collection(base.JobTable).Drop(ctx) require.NoError(t, client.Disconnect(ctx)) } +func TestRepository_CreateJob(t *testing.T) { + type args struct { + ctx context.Context + data []byte + path string + uid string + } + tests := []struct { + name string + existing []*base.Job + args args + want *base.Job + wantId string + wantErr error + }{ + { + name: "Simple success", + existing: nil, + args: args{ + ctx: context.Background(), + data: []byte(`{"id":"1", "uid": "2", "name": "sample", "description": "job desc"}`), + path: "sample.yaml", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + File: "sample.yaml", + }, + wantId: "1", + wantErr: nil, + }, + { + name: "Exist", + existing: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + File: "sample.yaml", + }, + }, + args: args{ + ctx: context.Background(), + data: []byte(`{"id":"1", "uid": "2", "name": "sample", "description": "job desc"}`), + path: "sample.yaml", + uid: "2", + }, + want: nil, + wantId: "", + wantErr: errors.New("job with id 1 exists"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.existing { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + id, err := repo.CreateJob(tt.args.ctx, tt.args.data, tt.args.path, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.want.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.want.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + assert.Equal(t, tt.wantId, id) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_GetJobById(t *testing.T) { + type args struct { + ctx context.Context + id string + uid string + } + tests := []struct { + name string + existing []*base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "Simple success", + existing: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + }, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.existing { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + job, err := repo.GetJobById(tt.args.ctx, tt.args.id, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + assert.Equal(t, tt.want, job) + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_GetUserJobs(t *testing.T) { + type args struct { + ctx context.Context + uid string + } + tests := []struct { + name string + existing []*base.Job + args args + want []*base.Job + wantErr error + }{ + { + name: "Simple success", + existing: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "2", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "3", + UID: "3", + Name: "sample", + Description: "job desc", + }, + }, + args: args{ + ctx: context.Background(), + uid: "2", + }, + want: []*base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "2", + UID: "2", + Name: "sample", + Description: "job desc", + }, + }, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + uid: "2", + }, + want: []*base.Job{}, + wantErr: nil, + }, + { + name: "No user jobs", + existing: []*base.Job{ + { + ID: "1", + UID: "3", + Description: "sample", + }, + }, + args: args{ + ctx: context.Background(), + uid: "2", + }, + want: []*base.Job{}, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.existing { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(e)) + require.NoError(t, err) + } + jobs, err := repo.GetUserJobs(tt.args.ctx, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + assert.Equal(t, tt.want, jobs) + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_GetAll(t *testing.T) { + type args struct { + ctx context.Context + } + tests := []struct { + name string + args args + want []base.Job + wantErr error + }{ + { + name: "Simple success 1", + want: []base.Job{ + { + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "2", + UID: "2", + Name: "sample", + Description: "job desc", + }, + { + ID: "3", + UID: "3", + Name: "sample", + Description: "job desc", + }, + }, + args: args{ + ctx: context.Background(), + }, + wantErr: nil, + }, + { + name: "Simple success 2", + want: []base.Job{ + { + ID: "1", + UID: "3", + Description: "sample", + }, + }, + args: args{ + ctx: context.Background(), + }, + wantErr: nil, + }, + { + name: "Not exist", + args: args{ + ctx: context.Background(), + }, + want: []base.Job{}, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + for _, e := range tt.want { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{e.ID}, marshalJob(&e)) + require.NoError(t, err) + } + jobs, err := repo.GetAll(tt.args.ctx) + if tt.wantErr == nil { + require.NoError(t, err) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + assert.Equal(t, tt.want, jobs) + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_UpdateJob(t *testing.T) { + type args struct { + ctx context.Context + job *base.Job + } + tests := []struct { + name string + existing *base.Job + args args + want *base.Job + count int + wantErr error + }{ + { + name: "Simple success", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + job: &base.Job{ + ID: "1", + UID: "2", + Name: "changed", + Description: "changed", + Schedule: "weekly", + Runs: 1, + }, + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "changed", + Description: "changed", + Schedule: "weekly", + Runs: 1, + }, + count: 1, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + job: &base.Job{ + ID: "1", + UID: "2", + Name: "changed", + Description: "changed", + Schedule: "weekly", + Runs: 1, + }, + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + if tt.existing != nil { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + require.NoError(t, err) + } + err := repo.UpdateJob(tt.args.ctx, tt.args.job) + if tt.wantErr == nil { + require.NoError(t, err) + it := repo.db.Query(base.JobTable).Iterate() + require.NoError(t, it.Err()) + c := 0 + for it.Next(tt.args.ctx) { + c++ + } + assert.Equal(t, tt.count, c) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_DeleteJob(t *testing.T) { + type args struct { + ctx context.Context + id string + uid string + } + tests := []struct { + name string + existing *base.Job + args args + count int + wantErr error + }{ + { + name: "Simple success", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + count: 0, + wantErr: nil, + }, + { + name: "Not matched", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + id: "2", + uid: "3", + }, + count: 1, + wantErr: nil, + }, + { + name: "Not exist", + existing: nil, + args: args{ + ctx: context.Background(), + id: "1", + uid: "2", + }, + count: 0, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + var err error + if tt.existing != nil { + _, err = repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + } + require.NoError(t, err) + err = repo.DeleteJob(tt.args.ctx, tt.args.id, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + it := repo.db.Query(base.JobTable).Iterate() + require.NoError(t, it.Err()) + c := 0 + for it.Next(tt.args.ctx) { + c++ + } + assert.Equal(t, tt.count, c) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_AddJobLink(t *testing.T) { + type args struct { + ctx context.Context + from string + to string + uid string + } + tests := []struct { + name string + existing *base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "No existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,", + }, + wantErr: nil, + }, + { + name: "Existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "3", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,3,", + }, + wantErr: nil, + }, + { + name: "No job", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + { + name: "Invalid argument", + existing: nil, + args: args{ + ctx: context.Background(), + from: "", + to: "2", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + { + name: "Invalid argument 2", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + if tt.existing != nil { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + require.NoError(t, err) + } + err := repo.AddJobLink(tt.args.ctx, tt.args.from, tt.args.to, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + +func TestRepository_DeleteJobLink(t *testing.T) { + type args struct { + ctx context.Context + from string + to string + uid string + } + tests := []struct { + name string + existing *base.Job + args args + want *base.Job + wantErr error + }{ + { + name: "No existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "", + }, + wantErr: nil, + }, + { + name: "Existing links", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "", + }, + wantErr: nil, + }, + { + name: "Existing links 2", + existing: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "2,3,", + }, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: &base.Job{ + ID: "1", + UID: "2", + Name: "sample", + Description: "job desc", + Schedule: "daily", + Runs: 0, + Links: "3,", + }, + wantErr: nil, + }, + { + name: "No job", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "2", + uid: "2", + }, + want: nil, + wantErr: nosql.ErrNotFound, + }, + { + name: "Invalid argument", + existing: nil, + args: args{ + ctx: context.Background(), + from: "", + to: "2", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + { + name: "Invalid argument 2", + existing: nil, + args: args{ + ctx: context.Background(), + from: "1", + to: "", + uid: "2", + }, + want: nil, + wantErr: ErrInvalidArgument, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cleanUp(t, tt.args.ctx) + repo := testRepo(t) + require.NoError(t, repo.ensureIndex(tt.args.ctx)) + + if tt.existing != nil { + _, err := repo.db.Insert(tt.args.ctx, base.JobTable, []string{tt.existing.ID}, marshalJob(tt.existing)) + require.NoError(t, err) + } + err := repo.DeleteJobLink(tt.args.ctx, tt.args.from, tt.args.to, tt.args.uid) + if tt.wantErr == nil { + require.NoError(t, err) + doc, err := repo.db.Query(base.JobTable).WithFields( + nosql.FieldFilter{ + Path: []string{"_id"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.ID), + }, + nosql.FieldFilter{ + Path: []string{"uid"}, + Filter: nosql.Equal, + Value: nosql.String(tt.existing.UID), + }, + ).One(tt.args.ctx) + require.NoError(t, err) + job := &base.Job{} + unmarshalJob(doc, job) + assert.Equal(t, tt.want, job) + } else { + require.EqualError(t, err, tt.wantErr.Error()) + } + + cleanUp(t, tt.args.ctx) + require.NoError(t, repo.Close()) + }) + } +} + func TestRepository_GetJobStats(t *testing.T) { type args struct { ctx context.Context @@ -42,14 +921,14 @@ func TestRepository_GetJobStats(t *testing.T) { } tests := []struct { name string - existing []base.JobStats + existing []*base.JobStats args args - want []base.JobStats + want []*base.JobStats wantErr error }{ { name: "Simple success", - existing: []base.JobStats{ + existing: []*base.JobStats{ { ID: "1", Description: "sample stats", @@ -61,7 +940,7 @@ func TestRepository_GetJobStats(t *testing.T) { ctx: context.Background(), id: "1", }, - want: []base.JobStats{ + want: []*base.JobStats{ { ID: "1", Description: "sample stats", @@ -78,7 +957,7 @@ func TestRepository_GetJobStats(t *testing.T) { require.NoError(t, repo.ensureIndex(tt.args.ctx)) for _, e := range tt.existing { - _, err := repo.db.Insert(tt.args.ctx, base.ObservabilityTable, nil, nosql.Document{ + _, err := repo.db.Insert(tt.args.ctx, base.ObservabilityTable, []string{e.ID}, nosql.Document{ "id": nosql.String(e.ID), "desc": nosql.String(e.Description), "type": nosql.String(e.Type), diff --git a/odin-engine/pkg/repository/repository.go b/odin-engine/pkg/repository/repository.go index 739aa41..f911998 100644 --- a/odin-engine/pkg/repository/repository.go +++ b/odin-engine/pkg/repository/repository.go @@ -8,6 +8,7 @@ import ( const ( DefaultDatabase = "odin" ObservabilityTable = "observability" + JobTable = "jobs" ) var registry = make(map[string]*Registration) @@ -34,14 +35,61 @@ func GetRegistration(name string) (*Registration, error) { return reg, nil } +// Job is a type to be used for accessing and storing job information +type Job struct { + ID string `yaml:"id"` + UID string `yaml:"uid"` + GID string `yaml:"gid"` + Name string `yaml:"name"` + Description string `yaml:"description"` + Language string `yaml:"language"` + File string `yaml:"file"` + Stats string `yaml:"stats"` + Schedule string `yaml:"schedule"` + Runs int + Links string +} + +// JobStats is a type to be used for accessing and storing job stats information type JobStats struct { ID string Description string Type string Value string + Timestamp string } type Repository interface { - GetJobStats(ctx context.Context, id string) ([]JobStats, error) + // CreateJob creates a new job for an user + CreateJob(ctx context.Context, data []byte, path string, uid string) (string, error) + + // GetJobById returns a job by job id and user id + GetJobById(ctx context.Context, id string, uid string) (*Job, error) + + // GetUserJobs returns all jobs belonging to an user + GetUserJobs(ctx context.Context, uid string) ([]*Job, error) + + // GetAll returns all jobs + GetAll(ctx context.Context) ([]Job, error) + + // UpdateJob modifies a job + UpdateJob(ctx context.Context, job *Job) error + + // DeleteJob deletes an user's job + DeleteJob(ctx context.Context, id string, uid string) error + + // AddJobLink is used to add links the job is associated with + AddJobLink(ctx context.Context, from string, to string, uid string) error + + // DeleteJobLink is used to delete links the job is associated with + DeleteJobLink(ctx context.Context, from string, to string, uid string) error + + // GetJobStats returns stats of a job given the job id + GetJobStats(ctx context.Context, id string) ([]*JobStats, error) + + // CreateJobStats create new job stats + CreateJobStats(ctx context.Context, js *JobStats) error + + // Close closes db connection Close() error }