7
7
"sync"
8
8
"time"
9
9
10
+ "gopkg.in/yaml.v2"
11
+
10
12
"github.com/baetyl/baetyl-go/v2/context"
11
13
"github.com/baetyl/baetyl-go/v2/errors"
12
14
"github.com/baetyl/baetyl-go/v2/log"
@@ -16,92 +18,58 @@ import (
16
18
)
17
19
18
20
const (
19
- DefaultAccessConf = "/etc/baetyl/access.yml"
20
- DefaultPropsConf = "/etc/baetyl/props.yml"
21
+ DefaultAccessConf = "etc/baetyl/access.yml"
22
+ DefaultPropsConf = "etc/baetyl/props.yml"
23
+ DefaultDriverConf = "etc/baetyl/conf.yml"
21
24
KeyDevice = "device"
22
25
KeyStatus = "status"
23
26
OnlineStatus = "online"
24
27
OfflineStatus = "offline"
25
28
TypeReportEvent = "report"
26
- KeySysExtConf = "BAETYL_SYSTEM_EXT_CONF"
27
29
)
28
30
29
31
var (
30
32
ErrInvalidMessage = errors .New ("invalid device message" )
31
33
ErrInvalidChannel = errors .New ("invalid channel" )
32
34
ErrResponseChannelNotExist = errors .New ("response channel not exist" )
35
+ ErrAccessConfigNotExist = errors .New ("access config not exist" )
36
+ ErrPropsConfigNotExist = errors .New ("properties config not exist" )
33
37
)
34
38
35
- type DeviceProperty struct {
36
- Name string `json:"name,omitempty"`
37
- Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
38
- Mode string `json:"mode,omitempty" validate:"regexp=^(ro|rw)?$"`
39
- Visitor PropertyVisitor `json:"visitor,omitempty"`
40
- }
41
-
42
- type PropertyVisitor struct {
43
- Modbus * ModbusVisitor `json:"modbus,omitempty"`
44
- Opcua * OpcuaVisitor `json:"opcua,omitempty"`
45
- Custom * CustomVisitor `json:"custom,omitempty"`
46
- }
47
-
48
- type ModbusVisitor struct {
49
- Function byte `json:"function" validate:"min=1,max=4"`
50
- Address string `json:"address"`
51
- Quantity uint16 `json:"quantity"`
52
- Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
53
- Scale float64 `json:"scale"`
54
- SwapByte bool `json:"swapByte"`
55
- SwapRegister bool `json:"swapRegister"`
56
- }
57
-
58
- type OpcuaVisitor struct {
59
- NodeID string `json:"nodeid,omitempty"`
60
- Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
61
- }
62
-
63
- type CustomVisitor string
64
-
65
- type Event struct {
66
- Type string `yaml:"type,omitempty" json:"type,omitempty"`
67
- Payload interface {} `yaml:"payload,omitempty" json:"payload,omitempty"`
68
- }
69
-
70
- type DeviceShadow struct {
71
- Name string `json:"name,omitempty"`
72
- Report v1.Report `json:"report,omitempty"`
73
- Desire v1.Desire `json:"desire,omitempty"`
74
- }
75
-
76
39
type DeltaCallback func (* DeviceInfo , v1.Delta ) error
77
-
78
40
type EventCallback func (* DeviceInfo , * Event ) error
79
41
80
42
type Context interface {
81
43
context.Context
82
- SystemConfigExt () * SystemConfig
83
44
GetAllDevices () []DeviceInfo
84
45
ReportDeviceProperties (* DeviceInfo , v1.Report ) error
85
- GetDeviceProperties (info * DeviceInfo ) (* DeviceShadow , error )
46
+ GetDeviceProperties (device * DeviceInfo ) (* DeviceShadow , error )
86
47
RegisterDeltaCallback (cb DeltaCallback ) error
87
48
RegisterEventCallback (cb EventCallback ) error
88
- Online (info * DeviceInfo ) error
89
- Offline (info * DeviceInfo ) error
90
- GetDeviceAccessConfig () (string , error )
91
- GetDevicePropConfigs () (map [string ][]DeviceProperty , error )
49
+ Online (device * DeviceInfo ) error
50
+ Offline (device * DeviceInfo ) error
51
+ GetDriverConfig () string
52
+ GetAccessConfig () map [string ]string
53
+ GetDeviceAccessConfig (device * DeviceInfo ) (string , error )
54
+ GetPropertiesConfig () map [string ][]DeviceProperty
55
+ GetDevicePropertiesConfig (device * DeviceInfo ) ([]DeviceProperty , error )
92
56
Start ()
93
57
io.Closer
94
58
}
95
59
96
60
type DmCtx struct {
97
61
context.Context
98
- log * log.Logger
99
- mqtt * mqtt2.Client
100
- tomb utils.Tomb
101
- eventCb EventCallback
102
- deltaCb DeltaCallback
103
- response sync.Map
104
- msgChs map [string ]chan * v1.Message
62
+ log * log.Logger
63
+ mqtt * mqtt2.Client
64
+ tomb utils.Tomb
65
+ eventCb EventCallback
66
+ deltaCb DeltaCallback
67
+ response sync.Map
68
+ devices map [string ]DeviceInfo
69
+ msgChs map [string ]chan * v1.Message
70
+ driverConfig string
71
+ propsConfig map [string ][]DeviceProperty
72
+ accessConfig map [string ]string
105
73
}
106
74
107
75
func NewContext (confFile string ) Context {
@@ -119,17 +87,31 @@ func NewContext(confFile string) Context {
119
87
lfs = append (lfs , log .Any ("service" , c .ServiceName ()))
120
88
}
121
89
c .log = log .With (lfs ... )
122
- sc := new (SystemConfig )
123
- if err := c .LoadCustomConfig (sc ); err != nil {
124
- c .log .Error ("failed to load system config, to use default config" , log .Error (err ))
125
- utils .UnmarshalYAML (nil , sc )
90
+
91
+ if err := unmarshalYAML (DefaultAccessConf , & c .accessConfig ); err != nil {
92
+ c .log .Error ("failed to load access config, to use default config" , log .Error (err ))
93
+ utils .UnmarshalYAML (nil , & c .accessConfig )
94
+ }
95
+
96
+ if err := unmarshalYAML (DefaultPropsConf , & c .propsConfig ); err != nil {
97
+ c .log .Error ("failed to load props config, to use default config" , log .Error (err ))
98
+ utils .UnmarshalYAML (nil , & c .propsConfig )
126
99
}
127
- c .Store (KeySysExtConf , sc )
128
100
101
+ var dCfg driverConfig
102
+ if err := unmarshalYAML (DefaultDriverConf , & dCfg ); err != nil {
103
+ c .log .Error ("failed to load driver config, to use default config" , log .Error (err ))
104
+ utils .UnmarshalYAML (nil , & dCfg )
105
+ }
106
+ c .driverConfig = dCfg .Driver
107
+
108
+ devices := make (map [string ]DeviceInfo )
129
109
var subs []mqtt2.QOSTopic
130
- for _ , dev := range sc .Devices {
110
+ for _ , dev := range dCfg .Devices {
131
111
subs = append (subs , dev .Delta , dev .Event , dev .GetResponse )
112
+ devices [dev .Name ] = dev
132
113
}
114
+ c .devices = devices
133
115
mqtt , err := c .Context .NewSystemBrokerClient (subs )
134
116
if err != nil {
135
117
c .log .Warn ("fail to create system broker client" , log .Any ("error" , err ))
@@ -143,9 +125,8 @@ func NewContext(confFile string) Context {
143
125
}
144
126
145
127
func (c * DmCtx ) Start () {
146
- devices := c .SystemConfigExt ().Devices
147
- for _ , dev := range devices {
148
- c .msgChs [dev .Name ] = make (chan * v1.Message , 1024 )
128
+ for name , dev := range c .devices {
129
+ c .msgChs [name ] = make (chan * v1.Message , 1024 )
149
130
go c .processing (c .msgChs [dev .Name ])
150
131
}
151
132
}
@@ -159,7 +140,7 @@ func (c *DmCtx) Close() error {
159
140
}
160
141
161
142
func (c * DmCtx ) processDelta (msg * v1.Message ) error {
162
- device := msg .Metadata [KeyDevice ]
143
+ deviceName := msg .Metadata [KeyDevice ]
163
144
if c .deltaCb == nil {
164
145
c .log .Debug ("delta callback not set and message will not be process" )
165
146
return nil
@@ -168,14 +149,19 @@ func (c *DmCtx) processDelta(msg *v1.Message) error {
168
149
if err := msg .Content .Unmarshal (& delta ); err != nil {
169
150
return errors .Trace (err )
170
151
}
171
- if err := c .deltaCb (& DeviceInfo {Name : device }, delta ); err != nil {
152
+ dev , ok := c .devices [deviceName ]
153
+ if ! ok {
154
+ c .log .Warn ("delta callback can not find device" , log .Any ("device" , deviceName ))
155
+ return nil
156
+ }
157
+ if err := c .deltaCb (& dev , delta ); err != nil {
172
158
return errors .Trace (err )
173
159
}
174
160
return nil
175
161
}
176
162
177
163
func (c * DmCtx ) processEvent (msg * v1.Message ) error {
178
- device := msg .Metadata [KeyDevice ]
164
+ deviceName := msg .Metadata [KeyDevice ]
179
165
if c .eventCb == nil {
180
166
c .log .Debug ("event callback not set and message will not be process" )
181
167
return nil
@@ -184,7 +170,12 @@ func (c *DmCtx) processEvent(msg *v1.Message) error {
184
170
if err := msg .Content .Unmarshal (& event ); err != nil {
185
171
return errors .Trace (err )
186
172
}
187
- if err := c .eventCb (& DeviceInfo {Name : device }, & event ); err != nil {
173
+ dev , ok := c .devices [deviceName ]
174
+ if ! ok {
175
+ c .log .Warn ("event callback can not find device" , log .Any ("device" , deviceName ))
176
+ return nil
177
+ }
178
+ if err := c .eventCb (& dev , & event ); err != nil {
188
179
return errors .Trace (err )
189
180
}
190
181
return nil
@@ -247,16 +238,12 @@ func (c *DmCtx) processing(ch chan *v1.Message) {
247
238
}
248
239
}
249
240
250
- func (c * DmCtx ) SystemConfigExt () * SystemConfig {
251
- v , ok := c .Load (KeySysExtConf )
252
- if ! ok {
253
- return nil
254
- }
255
- return v .(* SystemConfig )
256
- }
257
-
258
241
func (c * DmCtx ) GetAllDevices () []DeviceInfo {
259
- return c .SystemConfigExt ().Devices
242
+ var deviceList []DeviceInfo
243
+ for _ , dev := range c .devices {
244
+ deviceList = append (deviceList , dev )
245
+ }
246
+ return deviceList
260
247
}
261
248
262
249
func (c * DmCtx ) ReportDeviceProperties (info * DeviceInfo , report v1.Report ) error {
@@ -361,18 +348,37 @@ func (c *DmCtx) Offline(info *DeviceInfo) error {
361
348
return nil
362
349
}
363
350
364
- func (c * DmCtx ) GetDeviceAccessConfig () (string , error ) {
365
- res , err := ioutil .ReadFile (DefaultAccessConf )
366
- if err != nil {
367
- return "" , errors .Trace (err )
351
+ func (c * DmCtx ) GetDriverConfig () string {
352
+ return c .driverConfig
353
+ }
354
+ func (c * DmCtx ) GetAccessConfig () map [string ]string {
355
+ return c .accessConfig
356
+ }
357
+
358
+ func (c * DmCtx ) GetDeviceAccessConfig (device * DeviceInfo ) (string , error ) {
359
+ if cfg , ok := c .accessConfig [device .Name ]; ok {
360
+ return cfg , nil
361
+ } else {
362
+ return "" , ErrAccessConfigNotExist
368
363
}
369
- return string (res ), nil
370
364
}
371
365
372
- func (c * DmCtx ) GetDevicePropConfigs () (map [string ][]DeviceProperty , error ) {
373
- var res map [string ][]DeviceProperty
374
- if err := c .LoadCustomConfig (& res , DefaultPropsConf ); err != nil {
375
- return nil , errors .Trace (err )
366
+ func (c * DmCtx ) GetPropertiesConfig () map [string ][]DeviceProperty {
367
+ return c .propsConfig
368
+ }
369
+
370
+ func (c * DmCtx ) GetDevicePropertiesConfig (device * DeviceInfo ) ([]DeviceProperty , error ) {
371
+ if cfg , ok := c .propsConfig [device .Name ]; ok {
372
+ return cfg , nil
373
+ } else {
374
+ return nil , ErrPropsConfigNotExist
375
+ }
376
+ }
377
+
378
+ func unmarshalYAML (file string , out interface {}) error {
379
+ bs , err := ioutil .ReadFile (file )
380
+ if err != nil {
381
+ return err
376
382
}
377
- return res , nil
383
+ return yaml . Unmarshal ( bs , out )
378
384
}
0 commit comments