Skip to content

Commit 7b32a8f

Browse files
author
xiejingru
authored
refactor of device management (#249)
1 parent b32706e commit 7b32a8f

File tree

2 files changed

+177
-13
lines changed

2 files changed

+177
-13
lines changed

dmcontext/config.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package dmcontext
22

33
import (
4+
"time"
5+
46
mqtt2 "github.com/baetyl/baetyl-go/v2/mqtt"
57
v1 "github.com/baetyl/baetyl-go/v2/spec/v1"
68
)
@@ -19,6 +21,74 @@ type Topic struct {
1921
GetResponse mqtt2.QOSTopic `yaml:"getResponse,omitempty" json:"getResponse,omitempty"`
2022
}
2123

24+
func (a *AccessConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
25+
var modbus ModbusAccessConfig
26+
if err := unmarshal(&modbus); err == nil {
27+
a.Modbus = &modbus
28+
return nil
29+
}
30+
var opcua OpcuaAccessConfig
31+
if err := unmarshal(&opcua); err == nil {
32+
a.Opcua = &opcua
33+
return nil
34+
}
35+
var custom CustomAccessConfig
36+
if err := unmarshal(&custom); err != nil {
37+
return err
38+
}
39+
a.Custom = &custom
40+
return nil
41+
}
42+
43+
type AccessConfig struct {
44+
Modbus *ModbusAccessConfig `yaml:"modbus,omitempty" json:"modbus,omitempty"`
45+
Opcua *OpcuaAccessConfig `yaml:"opcua,omitempty" json:"opcua,omitempty"`
46+
Custom *CustomAccessConfig `yaml:"custom,omitempty" json:"custom,omitempty"`
47+
}
48+
49+
type ModbusAccessConfig struct {
50+
Tcp *TcpConfig `yaml:"tcp,omitempty" json:"tcp,omitempty"`
51+
Rtu *RtuConfig `yaml:"rtu,omitempty" json:"rtu,omitempty"`
52+
}
53+
54+
type TcpConfig struct {
55+
Address string `yaml:"address,omitempty" json:"address,omitempty" validate:"required"`
56+
Port uint16 `yaml:"port,omitempty" json:"port,omitempty" validate:"required"`
57+
}
58+
59+
type RtuConfig struct {
60+
Port string `yaml:"port,omitempty" json:"port,omitempty" validate:"required"`
61+
BaudRate int `yaml:"baudrate,omitempty" json:"baudrate,omitempty" default:"19200"`
62+
Parity string `yaml:"parity,omitempty" json:"parity,omitempty" default:"E" validate:"regexp=^(E|N|O)?$"`
63+
DataBit int `yaml:"databit,omitempty" json:"databit,omitempty" default:"8" validate:"min=5, max=8"`
64+
StopBit int `yaml:"stopbit,omitempty" json:"stopbit,omitempty" default:"1" validate:"min=1, max=2"`
65+
}
66+
67+
type OpcuaAccessConfig struct {
68+
ID byte `yaml:"id,omitempty" json:"id,omitempty"`
69+
Timeout time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
70+
Security OpcuaSecurity `yaml:"security,omitempty" json:"security,omitempty"`
71+
Auth OpcuaAuth `yaml:"auth,omitempty" json:"auth,omitempty"`
72+
Certificate OpcuaCertificate `yaml:"certificate,omitempty" json:"certificate,omitempty"`
73+
}
74+
75+
type OpcuaSecurity struct {
76+
Policy string `yaml:"policy,omitempty" json:"policy,omitempty"`
77+
Mode string `yaml:"mode,omitempty" json:"mode,omitempty"`
78+
}
79+
80+
type OpcuaAuth struct {
81+
Username string `yaml:"username,omitempty" json:"username,omitempty"`
82+
Password string `yaml:"password,omitempty" json:"password,omitempty"`
83+
}
84+
85+
type OpcuaCertificate struct {
86+
Cert string `yaml:"certFile,omitempty" json:"certFile,omitempty"`
87+
Key string `yaml:"keyFile,omitempty" json:"keyFile,omitempty"`
88+
}
89+
90+
type CustomAccessConfig string
91+
2292
type DeviceProperty struct {
2393
Name string `yaml:"name,omitempty" json:"name,omitempty"`
2494
Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`

dmcontext/context.go

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"io"
66
"io/ioutil"
7+
"strconv"
78
"sync"
89
"time"
910

@@ -34,6 +35,18 @@ var (
3435
ErrResponseChannelNotExist = errors.New("response channel not exist")
3536
ErrAccessConfigNotExist = errors.New("access config not exist")
3637
ErrPropsConfigNotExist = errors.New("properties config not exist")
38+
ErrDeviceNotExist = errors.New("device not exist")
39+
ErrTypeNotSupported = errors.New("type not supported")
40+
)
41+
42+
const (
43+
TypeInt16 = "int16"
44+
TypeInt32 = "int32"
45+
TypeInt64 = "int64"
46+
TypeFloat32 = "float32"
47+
TypeFloat64 = "float64"
48+
TypeBool = "bool"
49+
TypeString = "string"
3750
)
3851

3952
type DeltaCallback func(*DeviceInfo, v1.Delta) error
@@ -49,8 +62,8 @@ type Context interface {
4962
Online(device *DeviceInfo) error
5063
Offline(device *DeviceInfo) error
5164
GetDriverConfig() string
52-
GetAccessConfig() map[string]string
53-
GetDeviceAccessConfig(device *DeviceInfo) (string, error)
65+
GetAccessConfig() map[string]AccessConfig
66+
GetDeviceAccessConfig(device *DeviceInfo) (*AccessConfig, error)
5467
GetPropertiesConfig() map[string][]DeviceProperty
5568
GetDevicePropertiesConfig(device *DeviceInfo) ([]DeviceProperty, error)
5669
Start()
@@ -69,7 +82,7 @@ type DmCtx struct {
6982
msgChs map[string]chan *v1.Message
7083
driverConfig string
7184
propsConfig map[string][]DeviceProperty
72-
accessConfig map[string]string
85+
accessConfig map[string]AccessConfig
7386
}
7487

7588
func NewContext(confFile string) Context {
@@ -146,13 +159,16 @@ func (c *DmCtx) processDelta(msg *v1.Message) error {
146159
return nil
147160
}
148161
var delta v1.Delta
149-
if err := msg.Content.Unmarshal(&delta); err != nil {
162+
if err := msg.Content.ExactUnmarshal(&delta); err != nil {
150163
return errors.Trace(err)
151164
}
152165
dev, ok := c.devices[deviceName]
153166
if !ok {
154-
c.log.Warn("delta callback can not find device", log.Any("device", deviceName))
155-
return nil
167+
return errors.Trace(ErrDeviceNotExist)
168+
}
169+
delta, err := c.parsePropertyValues(deviceName, delta)
170+
if err != nil {
171+
return errors.Trace(err)
156172
}
157173
if err := c.deltaCb(&dev, delta); err != nil {
158174
return errors.Trace(err)
@@ -182,8 +198,8 @@ func (c *DmCtx) processEvent(msg *v1.Message) error {
182198
}
183199

184200
func (c *DmCtx) processResponse(msg *v1.Message) error {
185-
device := msg.Metadata[KeyDevice]
186-
val, ok := c.response.Load(device)
201+
deviceName := msg.Metadata[KeyDevice]
202+
val, ok := c.response.Load(deviceName)
187203
if !ok {
188204
return errors.Trace(ErrResponseChannelNotExist)
189205
}
@@ -192,12 +208,21 @@ func (c *DmCtx) processResponse(msg *v1.Message) error {
192208
return errors.Trace(ErrInvalidChannel)
193209
}
194210
var shad *DeviceShadow
195-
if err := msg.Content.Unmarshal(&shad); err != nil {
211+
if err := msg.Content.ExactUnmarshal(&shad); err != nil {
196212
return errors.Trace(err)
197213
}
198214
if !ok {
199215
return errors.Trace(ErrInvalidMessage)
200216
}
217+
var err error
218+
shad.Report, err = c.parsePropertyValues(deviceName, shad.Report)
219+
if err != nil {
220+
return errors.Trace(err)
221+
}
222+
shad.Desire, err = c.parsePropertyValues(deviceName, shad.Desire)
223+
if err != nil {
224+
return errors.Trace(err)
225+
}
201226
select {
202227
case ch <- shad:
203228
default:
@@ -351,15 +376,15 @@ func (c *DmCtx) Offline(info *DeviceInfo) error {
351376
func (c *DmCtx) GetDriverConfig() string {
352377
return c.driverConfig
353378
}
354-
func (c *DmCtx) GetAccessConfig() map[string]string {
379+
func (c *DmCtx) GetAccessConfig() map[string]AccessConfig {
355380
return c.accessConfig
356381
}
357382

358-
func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (string, error) {
383+
func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (*AccessConfig, error) {
359384
if cfg, ok := c.accessConfig[device.Name]; ok {
360-
return cfg, nil
385+
return &cfg, nil
361386
} else {
362-
return "", ErrAccessConfigNotExist
387+
return nil, ErrAccessConfigNotExist
363388
}
364389
}
365390

@@ -382,3 +407,72 @@ func unmarshalYAML(file string, out interface{}) error {
382407
}
383408
return yaml.Unmarshal(bs, out)
384409
}
410+
411+
func (c *DmCtx) parsePropertyValues(devName string, props map[string]interface{}) (map[string]interface{}, error) {
412+
res := make(map[string]interface{})
413+
vals, ok := c.propsConfig[devName]
414+
if !ok {
415+
return nil, errors.Trace(ErrDeviceNotExist)
416+
}
417+
cfgs := make(map[string]DeviceProperty)
418+
for _, val := range vals {
419+
cfgs[val.Name] = val
420+
}
421+
for key, val := range props {
422+
if cfg, ok := cfgs[key]; ok {
423+
pVal, err := parsePropertyValue(cfg.Type, val)
424+
if err != nil {
425+
return nil, errors.Trace(err)
426+
}
427+
res[key] = pVal
428+
} else {
429+
return nil, errors.Trace(ErrPropsConfigNotExist)
430+
}
431+
}
432+
return res, nil
433+
}
434+
435+
func parsePropertyValue(tpy string, val interface{}) (interface{}, error) {
436+
// it is json.Number (string actually) when val is number
437+
switch tpy {
438+
case TypeInt16:
439+
num, _ := val.(json.Number)
440+
i, err := strconv.ParseInt(num.String(), 10, 16)
441+
if err != nil {
442+
return nil, errors.Trace(err)
443+
}
444+
return int16(i), nil
445+
case TypeInt32:
446+
num, _ := val.(json.Number)
447+
i, err := strconv.ParseInt(num.String(), 10, 32)
448+
if err != nil {
449+
return nil, errors.Trace(err)
450+
}
451+
return int32(i), nil
452+
case TypeInt64:
453+
num, _ := val.(json.Number)
454+
i, err := strconv.ParseInt(num.String(), 10, 64)
455+
if err != nil {
456+
return nil, errors.Trace(err)
457+
}
458+
return i, nil
459+
case TypeFloat32:
460+
num, _ := val.(json.Number)
461+
f, err := strconv.ParseFloat(num.String(), 32)
462+
if err != nil {
463+
return nil, errors.Trace(err)
464+
}
465+
return float32(f), nil
466+
case TypeFloat64:
467+
num, _ := val.(json.Number)
468+
f, err := strconv.ParseFloat(num.String(), 64)
469+
if err != nil {
470+
return nil, errors.Trace(err)
471+
}
472+
return f, nil
473+
case TypeBool, TypeString:
474+
return val, nil
475+
default:
476+
return nil, errors.Trace(ErrTypeNotSupported)
477+
}
478+
}

0 commit comments

Comments
 (0)