Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var (
cmdCwdMatters bool
cmdChangeHome bool
cmdRepGroup string
cmdGroup string
cmdLimitGroups string
cmdModules string
cmdDepGroups string
Expand Down Expand Up @@ -175,6 +176,9 @@ a flag to disable this check: --disable_relative_check.
the $HOME environment variable to the actual command working directory before
running the cmd.

"group" specifies which unix group the command should run as; if no value is
set, the users default unix group is used.

"on_failure" determines what behaviours are triggered if your cmd exits non-0.
Behaviours are described using an array of objects, where each object has a key
corresponding to the name of the desired behaviour, and the relevant value. The
Expand Down Expand Up @@ -568,7 +572,7 @@ func init() {
addCmd.Flags().BoolVar(&cmdBsubMode, "bsub", false, "enable bsub emulation mode")
addCmd.Flags().BoolVar(&cmdDisableRelativeCheck, "disable_relative_check", false,
"disable the relative path checking when cwd_matters is false")

addCmd.Flags().StringVar(&cmdGroup, "group", "", "unix group to start the command as")
addCmd.Flags().IntVar(&timeoutint, "timeout", 120, "how long (seconds) to wait to get a reply from 'wr manager'")
addCmd.Flags().IntVar(&rtimeoutint, "reserve_timeout", 1, "how long (seconds) to wait before a runner exits when there is no more work'")
addCmd.Flags().BoolVarP(&simpleOutput, "simple", "s", false, "simplify output to only queued job ids")
Expand Down Expand Up @@ -629,6 +633,7 @@ func parseCmdFile(jq *jobqueue.Client, diskSet bool) ([]*jobqueue.Job, bool, boo
jd := &jobqueue.JobDefaults{
RepGrp: cmdRepGroup,
ReqGrp: reqGroup,
Group: cmdGroup,
Cwd: cmdCwd,
CwdMatters: cmdCwdMatters,
ChangeHome: cmdChangeHome,
Expand Down
5 changes: 5 additions & 0 deletions cmd/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ new internal ids is printed.`,
jm.SetReqGroup(reqGroup)
}

if cobraCmd.Flags().Changed("group") {
jm.SetUnixGroup(cmdGroup)
}

req := &jqs.Requirements{}
var setReq bool
if cobraCmd.Flags().Changed("memory") {
Expand Down Expand Up @@ -422,6 +426,7 @@ func init() {
modCmd.Flags().StringVar(&cmdCloudConfigs, "cloud_config_files", "", "in the cloud, comma separated paths of config files to copy to servers created to run these commands")
modCmd.Flags().BoolVar(&cmdCloudSharedDisk, "cloud_shared", false, "mount /shared")
modCmd.Flags().BoolVar(&cmdCloudSharedDiskUnset, "unset_cloud_shared", false, "unset --cloud_shared")
modCmd.Flags().StringVar(&cmdGroup, "group", "", "unix group to start the command as")
modCmd.Flags().StringVar(&cmdEnv, "env", "", "comma-separated list of key=value environment variables to set before running the commands")
modCmd.Flags().StringVar(&cmdQueue, "queue", "", "name of queue to submit to, for schedulers with queues")
modCmd.Flags().StringVar(&cmdQueuesAvoidMod, "queues_avoid", "",
Expand Down
11 changes: 10 additions & 1 deletion jobqueue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,15 @@ func (c *Client) Execute(ctx context.Context, job *Job, shell string) error {
if strings.Contains(jc, " | ") {
jc = "set -o pipefail; " + jc
}
cmd := exec.Command(shell, "-c", jc) // #nosec Our whole purpose is to allow users to run arbitrary commands via us...

var cmd *exec.Cmd

if job.Group != "" {
cmd = exec.Command("newgrp", job.Group) //nolint:gosec
cmd.Stdin = strings.NewReader(jc)
} else {
cmd = exec.Command(shell, "-c", jc) // #nosec Our whole purpose is to allow users to run arbitrary commands via us...
}

// we'll filter STDERR/OUT of the cmd to keep only the first and last line
// of any contiguous block of \r terminated lines (to mostly eliminate
Expand Down Expand Up @@ -851,6 +859,7 @@ func (c *Client) Execute(ctx context.Context, job *Job, shell string) error {
"LSF_LIBDIR=/dev/null",
"LSF_ENVDIR=/dev/null",
"LSF_BINDIR=" + prependPath,
"SHELL=" + shell,
})
}
cmd.Env = env
Expand Down
16 changes: 16 additions & 0 deletions jobqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ type Job struct {
// you expect to have similar resource requirements.
ReqGroup string

// Group is the group name to run the executable as; a value of empty string
// will use the default group.
Group string

// Requirements describes the resources this Cmd needs to run, such as RAM,
// Disk and time. These may be determined for you by the system (depending
// on Override) based on past experience of running jobs with the same
Expand Down Expand Up @@ -1042,6 +1046,7 @@ type JobModifier struct {
Cmd string
Cwd string
ReqGroup string
Group string
BsubMode string
MonitorDocker string
WithDocker string
Expand All @@ -1053,6 +1058,7 @@ type JobModifier struct {
ChangeHome bool
ChangeHomeSet bool
ReqGroupSet bool
GroupSet bool
Override uint8
OverrideSet bool
Priority uint8
Expand Down Expand Up @@ -1112,6 +1118,11 @@ func (j *JobModifier) SetReqGroup(newVal string) {
j.ReqGroupSet = true
}

func (j *JobModifier) SetUnixGroup(group string) {
j.Group = group
j.GroupSet = true
}

// SetRequirements notes that you want to modify the Requirements of Jobs. You
// can't modify to a nil Requirements, so if req is nil, no set is done.
//
Expand Down Expand Up @@ -1319,6 +1330,11 @@ func (j *JobModifier) Modify(jobs []*Job, server *Server) (map[string]string, er
if j.ReqGroupSet {
job.ReqGroup = j.ReqGroup
}

if j.GroupSet {
job.Group = j.Group
}

if j.Requirements != nil {
if j.Requirements.RAM != 0 {
job.Requirements.RAM = j.Requirements.RAM
Expand Down
46 changes: 46 additions & 0 deletions jobqueue/jobqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"math"
"os"
"os/exec"
"os/user"
"path/filepath"
"runtime"
"strconv"
Expand Down Expand Up @@ -1300,6 +1301,51 @@ func TestJobqueueBasics(t *testing.T) {
So(stdout, ShouldEqual, "c\nd")
})

Convey("You can execute a job as a different group", func() {
server.racmutex.Lock()
server.rc = ""
server.racmutex.Unlock()

groups, err := os.Getgroups()
So(err, ShouldBeNil)
So(len(groups), ShouldBeGreaterThan, 1)

second, err := user.LookupGroupId(strconv.Itoa(groups[1]))
So(err, ShouldBeNil)

inserts, already, err := jq.Add([]*Job{
{Cmd: "id", Cwd: t.TempDir(), Requirements: standardReqs, RepGroup: "manually_added"},
{Cmd: "id ", Group: second.Name, Cwd: t.TempDir(), Requirements: standardReqs, RepGroup: "manually_added"},
}, []string{}, true)
So(err, ShouldBeNil)
So(inserts, ShouldEqual, 2)
So(already, ShouldEqual, 0)

job, err := jq.Reserve(0)
So(err, ShouldBeNil)
So(job, ShouldNotBeNil)

err = jq.Execute(ctx, job, config.RunnerExecShell)
So(err, ShouldBeNil)

stdoutA, err := job.StdOut()
So(err, ShouldBeNil)
So(stdoutA, ShouldNotBeEmpty)

job, err = jq.Reserve(0)
So(err, ShouldBeNil)
So(job, ShouldNotBeNil)

err = jq.Execute(ctx, job, config.RunnerExecShell)
So(err, ShouldBeNil)

stdoutB, err := job.StdOut()
So(err, ShouldBeNil)
So(stdoutB, ShouldNotBeEmpty)

So(stdoutA, ShouldNotEqual, stdoutB)
})

Convey("You can stop the server by sending it a SIGTERM or SIGINT", func() {
err := jq.Disconnect()
So(err, ShouldBeNil)
Expand Down
1 change: 1 addition & 0 deletions jobqueue/serverCLI.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func (s *Server) itemToJob(ctx context.Context, item *queue.Item, getStd bool, g
job := &Job{
RepGroup: sjob.RepGroup,
ReqGroup: sjob.ReqGroup,
Group: sjob.Group,
LimitGroups: sjob.LimitGroups,
Modules: sjob.Modules,
DepGroups: sjob.DepGroups,
Expand Down
8 changes: 8 additions & 0 deletions jobqueue/serverREST.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type JobViaJSON struct {
Cmd string `json:"cmd"`
Cwd string `json:"cwd"`
ReqGrp string `json:"req_grp"`
Group string `json:"group"`
// Memory is a number and unit suffix, eg. 1G for 1 Gigabyte.
Memory string `json:"memory"`
// Time is a duration with a unit suffix, eg. 1h for 1 hour.
Expand Down Expand Up @@ -114,6 +115,7 @@ type JobDefaults struct {
MountConfigs MountConfigs
compressedEnv []byte
RepGrp string
Group string
// Cwd defaults to /tmp.
Cwd string
ReqGrp string
Expand Down Expand Up @@ -270,6 +272,11 @@ func (jvj *JobViaJSON) Convert(jd *JobDefaults) (*Job, error) {
rg = jvj.ReqGrp
}

group := jd.Group
if jvj.Group != "" {
group = jvj.Group
}

if jvj.CPUs == nil {
cpus = jd.DefaultCPUs()
} else {
Expand Down Expand Up @@ -517,6 +524,7 @@ func (jvj *JobViaJSON) Convert(jd *JobDefaults) (*Job, error) {
CwdMatters: cwdMatters,
ChangeHome: changeHome,
ReqGroup: rg,
Group: group,
Requirements: &jqs.Requirements{RAM: mb, Time: dur, Cores: cpus, Disk: disk, DiskSet: diskSet, Other: other},
Override: uint8(override),
Priority: uint8(priority),
Expand Down
Loading