Skip to content
214 changes: 190 additions & 24 deletions api/types/load_traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

apitypes "k8s.io/apimachinery/pkg/types"
"gopkg.in/yaml.v2"
)

// ContentType represents the format of response.
Expand All @@ -31,6 +32,27 @@ func (ct ContentType) Validate() error {
}
}

// ExecutionMode represents the execution strategy for generating requests.
type ExecutionMode string

const (
// ModeWeightedRandom generates requests randomly based on weighted distribution.
ModeWeightedRandom ExecutionMode = "weighted-random"
// ModeTimeSeries replays requests from time-bucketed audit logs.
ModeTimeSeries ExecutionMode = "time-series"
)

// Validate returns error if ExecutionMode is not supported.
func (em ExecutionMode) Validate() error {
switch em {
case ModeWeightedRandom, ModeTimeSeries:
return nil
default:
return fmt.Errorf("unsupported execution mode: %s", em)
}
}


// LoadProfile defines how to create load traffic from one host to kube-apiserver.
type LoadProfile struct {
// Version defines the version of this object.
Expand All @@ -41,14 +63,8 @@ type LoadProfile struct {
Spec LoadProfileSpec `json:"spec" yaml:"spec"`
}

// LoadProfileSpec defines the load traffic for traget resource.
// LoadProfileSpec defines the load traffic for target resource.
type LoadProfileSpec struct {
// Rate defines the maximum requests per second (zero is no limit).
Rate float64 `json:"rate" yaml:"rate"`
// Total defines the total number of requests.
Total int `json:"total" yaml:"total"`
// Duration defines the running time in seconds.
Duration int `json:"duration" yaml:"duration"`
// Conns defines total number of long connections used for traffic.
Conns int `json:"conns" yaml:"conns"`
// Client defines total number of HTTP clients.
Expand All @@ -61,9 +77,12 @@ type LoadProfileSpec struct {
// retrying upon receiving "Retry-After" headers and 429 status-code
// in the response (<= 0 means no retry).
MaxRetries int `json:"maxRetries" yaml:"maxRetries"`
// Requests defines the different kinds of requests with weights.
// The executor should randomly pick by weight.
Requests []*WeightedRequest `json:"requests" yaml:"requests"`

// Mode defines the execution strategy (weighted-random, time-series, etc.).
Mode ExecutionMode `json:"mode" yaml:"mode"`
// ModeConfig contains mode-specific configuration.
// This is automatically deserialized to the correct type based on Mode.
ModeConfig ModeConfig `json:"modeConfig" yaml:"modeConfig"`
}

// KubeGroupVersionResource identifies the resource URI.
Expand Down Expand Up @@ -201,34 +220,181 @@ func (lp LoadProfile) Validate() error {
return lp.Spec.Validate()
}

// Validate verifies fields of LoadProfileSpec.
func (spec LoadProfileSpec) Validate() error {
if spec.Conns <= 0 {
return fmt.Errorf("conns requires > 0: %v", spec.Conns)
// UnmarshalYAML implements custom YAML unmarshaling for LoadProfileSpec.
// It automatically deserializes ModeConfig to the correct concrete type based on Mode.
// It also provides backward compatibility for legacy format (without mode field).
func (spec *LoadProfileSpec) UnmarshalYAML(unmarshal func(interface{}) error) error {
// Create a temporary struct that has all fields explicitly (no embedding)
type tempSpec struct {
Conns int `yaml:"conns"`
Client int `yaml:"client"`
ContentType ContentType `yaml:"contentType"`
DisableHTTP2 bool `yaml:"disableHTTP2"`
MaxRetries int `yaml:"maxRetries"`
Mode ExecutionMode `yaml:"mode"`
ModeConfig map[string]interface{} `yaml:"modeConfig"`

// Legacy fields (for backward compatibility)
Rate float64 `yaml:"rate"`
Total int `yaml:"total"`
Duration int `yaml:"duration"`
Requests []*WeightedRequest `yaml:"requests"`
}

temp := &tempSpec{}
if err := unmarshal(temp); err != nil {
return err
}

// Copy common fields
spec.Conns = temp.Conns
spec.Client = temp.Client
spec.ContentType = temp.ContentType
spec.DisableHTTP2 = temp.DisableHTTP2
spec.MaxRetries = temp.MaxRetries

// Check if this is legacy format (no mode specified but has requests)
if temp.Mode == "" && len(temp.Requests) > 0 {
// Auto-migrate legacy format to weighted-random mode
spec.Mode = ModeWeightedRandom
spec.ModeConfig = &WeightedRandomConfig{
Rate: temp.Rate,
Total: temp.Total,
Duration: temp.Duration,
Requests: temp.Requests,
}
return nil
}

// New format: mode is specified
spec.Mode = temp.Mode

// Now unmarshal ModeConfig based on Mode
if temp.ModeConfig != nil {
var config ModeConfig
switch temp.Mode {
case ModeWeightedRandom:
config = &WeightedRandomConfig{}
case ModeTimeSeries:
config = &TimeSeriesConfig{}
default:
return fmt.Errorf("unknown mode: %s", temp.Mode)
}

// Convert map to YAML bytes and unmarshal into typed struct
data, err := yaml.Marshal(temp.ModeConfig)
if err != nil {
return fmt.Errorf("failed to marshal modeConfig: %w", err)
}
if err := yaml.Unmarshal(data, config); err != nil {
return fmt.Errorf("failed to unmarshal modeConfig for mode %s: %w", temp.Mode, err)
}
spec.ModeConfig = config
}

return nil
}

// UnmarshalJSON implements custom JSON unmarshaling for LoadProfileSpec.
// It automatically deserializes ModeConfig to the correct concrete type based on Mode.
// It also provides backward compatibility for legacy format (without mode field).
func (spec *LoadProfileSpec) UnmarshalJSON(data []byte) error {
// Create a temporary struct that has all fields explicitly (no embedding)
type tempSpec struct {
Conns int `json:"conns"`
Client int `json:"client"`
ContentType ContentType `json:"contentType"`
DisableHTTP2 bool `json:"disableHTTP2"`
MaxRetries int `json:"maxRetries"`
Mode ExecutionMode `json:"mode"`
ModeConfig map[string]interface{} `json:"modeConfig"`

// Legacy fields (for backward compatibility)
Rate float64 `json:"rate"`
Total int `json:"total"`
Duration int `json:"duration"`
Requests []*WeightedRequest `json:"requests"`
}

temp := &tempSpec{}
if err := json.Unmarshal(data, temp); err != nil {
return err
}

if spec.Rate < 0 {
return fmt.Errorf("rate requires >= 0: %v", spec.Rate)
// Copy common fields
spec.Conns = temp.Conns
spec.Client = temp.Client
spec.ContentType = temp.ContentType
spec.DisableHTTP2 = temp.DisableHTTP2
spec.MaxRetries = temp.MaxRetries

// Check if this is legacy format (no mode specified but has requests)
if temp.Mode == "" && len(temp.Requests) > 0 {
// Auto-migrate legacy format to weighted-random mode
spec.Mode = ModeWeightedRandom
spec.ModeConfig = &WeightedRandomConfig{
Rate: temp.Rate,
Total: temp.Total,
Duration: temp.Duration,
Requests: temp.Requests,
}
return nil
}

if spec.Total <= 0 && spec.Duration <= 0 {
return fmt.Errorf("total requires > 0: %v or duration > 0s: %v", spec.Total, spec.Duration)
// New format: mode is specified
spec.Mode = temp.Mode

// Now unmarshal ModeConfig based on Mode
if temp.ModeConfig != nil {
var config ModeConfig
switch temp.Mode {
case ModeWeightedRandom:
config = &WeightedRandomConfig{}
case ModeTimeSeries:
config = &TimeSeriesConfig{}
default:
return fmt.Errorf("unknown mode: %s", temp.Mode)
}

// Convert map to JSON bytes and unmarshal into typed struct
data, err := json.Marshal(temp.ModeConfig)
if err != nil {
return fmt.Errorf("failed to marshal modeConfig: %w", err)
}
if err := json.Unmarshal(data, config); err != nil {
return fmt.Errorf("failed to unmarshal modeConfig for mode %s: %w", temp.Mode, err)
}
spec.ModeConfig = config
}

return nil
}


// Validate verifies fields of LoadProfileSpec.
func (spec *LoadProfileSpec) Validate() error {

// Validate common fields
if spec.Conns <= 0 {
return fmt.Errorf("conns requires > 0: %v", spec.Conns)
}

if spec.Client <= 0 {
return fmt.Errorf("client requires > 0: %v", spec.Client)
}

err := spec.ContentType.Validate()
if err != nil {
if err := spec.ContentType.Validate(); err != nil {
return err
}

for idx, req := range spec.Requests {
if err := req.Validate(); err != nil {
return fmt.Errorf("idx: %v request: %v", idx, err)
}
if err := spec.Mode.Validate(); err != nil {
return err
}

if spec.ModeConfig == nil {
return fmt.Errorf("modeConfig is required")
}

return nil
}

Expand Down
Loading