Skip to content

Commit 64ec713

Browse files
Merge pull request #8 from DIMO-Network/cloud_event_no_subject_and_producer
Removed unbufferedMsg references from motorq example
2 parents 73a2122 + c3bcc2a commit 64ec713

File tree

8 files changed

+122
-338
lines changed

8 files changed

+122
-338
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ require (
3636
github.com/volatiletech/null/v8 v8.1.2
3737
github.com/volatiletech/sqlboiler/v4 v4.18.0
3838
github.com/volatiletech/strmangle v0.0.8
39-
go.uber.org/mock v0.5.0
4039
golang.org/x/sync v0.13.0
4140
golang.org/x/time v0.11.0
4241
gotest.tools/v3 v3.5.2

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -891,8 +891,6 @@ go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp
891891
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
892892
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
893893
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
894-
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
895-
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
896894
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
897895
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
898896
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=

internal/config/settings.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,10 @@ type Settings struct {
3535
UnbufferedTelemetryConsumerGroup string `yaml:"UNBUFFERED_TELEMETRY_CONSUMER_GROUP"`
3636

3737
// DIS - DIMO Ingest Service
38-
ConvertToCloudEvent bool `yaml:"CONVERT_TO_CLOUD_EVENT"` // if true, telemetry is converted to cloud events before sending to DIS
39-
DimoNodeEndpoint string `yaml:"DIMO_NODE_ENDPOINT"`
40-
Cert string `yaml:"CERT"` // should be secrets
41-
CertKey string `yaml:"CERT_KEY"` // should be secrets
42-
CACert string `yaml:"CA_CERT"` // DIMO Root CA, same for everybody
38+
DimoNodeEndpoint string `yaml:"DIMO_NODE_ENDPOINT"`
39+
Cert string `yaml:"CERT"` // should be secrets
40+
CertKey string `yaml:"CERT_KEY"` // should be secrets
41+
CACert string `yaml:"CA_CERT"` // DIMO Root CA, same for everybody
4342

4443
// Chain - These are standard Polygon values for DIMO
4544
ChainID int64 `yaml:"CHAIN_ID"`

internal/convert/convert.go

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@ package convert
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"github.com/DIMO-Network/cloudevent"
67
"github.com/DIMO-Network/model-garage/pkg/defaultmodule"
78
"github.com/DIMO-Network/oracle-example/internal/config"
89
dbmodels "github.com/DIMO-Network/oracle-example/internal/db/models"
910
"github.com/DIMO-Network/oracle-example/internal/models"
10-
"github.com/segmentio/ksuid"
11+
"github.com/rs/zerolog"
1112
"time"
1213
)
1314

14-
// ToCloudEvent Convert the external vendor msg payload to a CloudEvent
15-
func ToCloudEvent(veh dbmodels.Vin, msg models.UnbufferedMessageValue, settings config.Settings) (*cloudevent.CloudEvent[json.RawMessage], error) {
15+
// SetProducerAndSubject sets the producer and consumer fields in the CloudEvent
16+
func SetProducerAndSubject(veh dbmodels.Vin, ce *cloudevent.CloudEvent[json.RawMessage], settings config.Settings) error {
1617
// Construct the producer DID
1718
producer := cloudevent.NFTDID{
1819
ChainID: uint64(settings.ChainID),
@@ -31,44 +32,52 @@ func ToCloudEvent(veh dbmodels.Vin, msg models.UnbufferedMessageValue, settings
3132
}.String()
3233
}
3334

34-
ch, err := createCloudEventHeader(msg.Timestamp, producer, subject, cloudevent.TypeStatus)
35-
if err != nil {
36-
return nil, err
37-
}
35+
ce.Subject = subject
36+
ce.Producer = producer
3837

39-
// transform the data to default DIS format
40-
signals, err := mapDataToSignals(msg.Data, msg.Timestamp)
38+
return nil
39+
}
4140

42-
if err != nil {
43-
return nil, err
41+
// ValidateSignals validates the signals from the message
42+
func ValidateSignals(signals interface{}, logger zerolog.Logger) error {
43+
44+
signalsArr, ok := signals.([]interface{})
45+
if !ok {
46+
err := fmt.Errorf("signals is not of type []interface{}")
47+
logger.Error().Err(err).Msg("Invalid type for signals")
48+
return err
4449
}
4550

46-
// Wrap the signals into a struct
47-
wrappedData := struct {
48-
Signals []*defaultmodule.Signal `json:"signals"`
49-
Vin string `json:"vin"`
50-
}{
51-
Signals: signals,
52-
Vin: veh.Vin,
51+
signalArray, err := CastToSliceOfMaps(signalsArr)
52+
if err != nil {
53+
logger.Error().Err(err).Msg("Failed to cast signals to slice of maps")
54+
return err
5355
}
5456

55-
// Marshal the wrapped data to json.RawMessage
56-
data, err := json.Marshal(wrappedData)
57+
// Load the signal map
58+
sigMap, err := defaultmodule.LoadSignalMap()
5759
if err != nil {
58-
return nil, err
60+
return err
5961
}
6062

61-
// Create the CloudEvent
62-
ce := &cloudevent.CloudEvent[json.RawMessage]{
63-
CloudEventHeader: ch,
64-
Data: data,
63+
for _, signal := range signalArray {
64+
name, ok := signal["name"].(string)
65+
if !ok {
66+
logger.Warn().Msgf("Signal name is missing or not a string: %v\n", signal)
67+
continue
68+
}
69+
70+
if _, exists := sigMap[name]; !exists {
71+
logger.Warn().Msgf("Signal %s is not in the signal map\n", name)
72+
continue
73+
}
6574
}
6675

67-
return ce, nil
76+
return nil
6877
}
6978

70-
// mapDataToSignals maps the data from the message to the default DIS signals
71-
func mapDataToSignals(data models.Data, ts time.Time) ([]*defaultmodule.Signal, error) {
79+
// MapDataToSignals maps the data from the message to the default DIS signals
80+
func MapDataToSignals(data models.Data, ts time.Time) ([]*defaultmodule.Signal, error) {
7281
var signals []*defaultmodule.Signal
7382

7483
sigMap, err := defaultmodule.LoadSignalMap()
@@ -124,16 +133,16 @@ func mapDataToSignals(data models.Data, ts time.Time) ([]*defaultmodule.Signal,
124133
return signals, nil
125134
}
126135

127-
// createCloudEvent creates a cloud event from autopi event.
128-
func createCloudEventHeader(ts time.Time, producer, subject, eventType string) (cloudevent.CloudEventHeader, error) {
129-
return cloudevent.CloudEventHeader{
130-
DataContentType: "application/json",
131-
ID: ksuid.New().String(),
132-
Subject: subject,
133-
SpecVersion: "1.0",
134-
Time: ts,
135-
Type: eventType,
136-
DataVersion: "default/v1.0",
137-
Producer: producer,
138-
}, nil
136+
func CastToSliceOfMaps(signals []interface{}) ([]map[string]interface{}, error) {
137+
var result []map[string]interface{}
138+
139+
for _, item := range signals {
140+
castItem, ok := item.(map[string]interface{})
141+
if !ok {
142+
return nil, fmt.Errorf("failed to cast item to map[string]interface{}: %v", item)
143+
}
144+
result = append(result, castItem)
145+
}
146+
147+
return result, nil
139148
}

internal/convert/convert_test.go

Lines changed: 0 additions & 199 deletions
This file was deleted.

internal/models/models.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package models
22

3-
import (
4-
"time"
5-
)
6-
73
type GraphQLRequest struct {
84
Query string `json:"query"`
95
}
@@ -75,15 +71,6 @@ type GraphQlData[T any] struct {
7571
Data T `json:"data"`
7672
}
7773

78-
type UnbufferedMessageValue struct {
79-
ID string `json:"id"`
80-
DataType string `json:"dataType"`
81-
VehicleID string `json:"vehicleId"`
82-
DeviceID *string `json:"deviceId"`
83-
Timestamp time.Time `json:"timestamp"`
84-
Data Data `json:"data"`
85-
}
86-
8774
type Data struct {
8875
Location Location `json:"location"`
8976
Speed Signal `json:"speed"`

0 commit comments

Comments
 (0)