Skip to content

Commit

Permalink
Error handling and data conversion (#18)
Browse files Browse the repository at this point in the history
* Update global workflows (#3)

* Update global workflows (#7)

* Update global workflows (#8)

* Update global workflows (#10)

* Update global workflows (#9)

* Update global workflows (#12)

* Update global workflows (#11)

* Update global workflows (#13)

* Update global workflows (#14)

* Fix error handling and data conversion in
OPCUAInput ReadBatch function

* Delete .github/PULL_REQUEST_TEMPLATE.md

* Delete .github/dependabot.yaml

* Delete .github/CODEOWNERS

* Delete .github/release-drafter.yml

* Delete .github/renovate.json

* Delete .github/workflows/auto-merge.yaml

* Delete .github/workflows/fork-sync.yaml

* Delete .github/workflows/github-readme-tree.yaml

* Delete .github/workflows/todos-to-issues.yaml

* Delete .github/workflows/sync-labels.yaml

* Delete .github/workflows/require-labels.yaml

* Delete .github/workflows/release-drafter.yaml

* Final fix

* Fix nil value handling in OPCUAInput ReadBatchPull
method

* Fix error message format in OPCUA plugin
  • Loading branch information
devantler authored Nov 16, 2023
1 parent cfcc1f9 commit 5b1f55b
Showing 1 changed file with 72 additions and 10 deletions.
82 changes: 72 additions & 10 deletions plugin/opcua.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (g *OPCUAInput) Connect(ctx context.Context) error {

// Step 3.1: Filter the endpoints based on the selected authentication method.
// This will eliminate endpoints that do not support the chosen method.
selectedEndpoint := g.getReasonableEndpoint(endpoints, selectedAuthentication, g.insecure, g.securityMode, g.securityPolicy,)
selectedEndpoint := g.getReasonableEndpoint(endpoints, selectedAuthentication, g.insecure, g.securityMode, g.securityPolicy)
if selectedEndpoint == nil {
g.log.Errorf("Could not select a suitable endpoint")
return err
Expand Down Expand Up @@ -519,6 +519,8 @@ func (g *OPCUAInput) createMessageFromValue(value interface{}, nodeID string) *s
b := make([]byte, 0)

switch v := value.(type) {
case float32:
b = append(b, []byte(strconv.FormatFloat(float64(v), 'f', -1, 32))...)
case float64:
b = append(b, []byte(strconv.FormatFloat(v, 'f', -1, 64))...)
case string:
Expand All @@ -545,8 +547,62 @@ func (g *OPCUAInput) createMessageFromValue(value interface{}, nodeID string) *s
b = append(b, []byte(strconv.FormatUint(uint64(v), 10))...)
case uint64:
b = append(b, []byte(strconv.FormatUint(v, 10))...)
case float32:
b = append(b, []byte(strconv.FormatFloat(float64(v), 'f', -1, 32))...)
case []float32:
for _, val := range v {
b = append(b, []byte(strconv.FormatFloat(float64(val), 'f', -1, 32))...)
}
case []float64:
for _, val := range v {
b = append(b, []byte(strconv.FormatFloat(val, 'f', -1, 64))...)
}
case []string:
for _, val := range v {
b = append(b, []byte(string(val))...)
}
case []bool:
for _, val := range v {
b = append(b, []byte(strconv.FormatBool(val))...)
}
case []int:
for _, val := range v {
b = append(b, []byte(strconv.Itoa(val))...)
}
case []int8:
for _, val := range v {
b = append(b, []byte(strconv.FormatInt(int64(val), 10))...)
}
case []int16:
for _, val := range v {
b = append(b, []byte(strconv.FormatInt(int64(val), 10))...)
}
case []int32:
for _, val := range v {
b = append(b, []byte(strconv.FormatInt(int64(val), 10))...)
}
case []int64:
for _, val := range v {
b = append(b, []byte(strconv.FormatInt(val, 10))...)
}
case []uint:
for _, val := range v {
b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...)
}
case []uint8:
for _, val := range v {
b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...)
}
case []uint16:
for _, val := range v {
b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...)
}
case []uint32:
for _, val := range v {
b = append(b, []byte(strconv.FormatUint(uint64(val), 10))...)
}
case []uint64:
for _, val := range v {
b = append(b, []byte(strconv.FormatUint(val, 10))...)
}
default:
g.log.Errorf("Unknown type: %T", v)
return nil
Expand Down Expand Up @@ -584,23 +640,24 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s
g.log.Errorf("Read failed: %s", err)
// if the error is StatusBadSessionIDInvalid, the session has been closed
// and we need to reconnect.
if err == ua.StatusBadSessionIDInvalid {
switch err {
case ua.StatusBadSessionIDInvalid:
g.client.Close(ctx)
g.client = nil
return nil, nil, service.ErrNotConnected
} else if err == ua.StatusBadCommunicationError {
case ua.StatusBadCommunicationError:
g.client.Close(ctx)
g.client = nil
return nil, nil, service.ErrNotConnected
} else if err == ua.StatusBadConnectionClosed {
case ua.StatusBadConnectionClosed:
g.client.Close(ctx)
g.client = nil
return nil, nil, service.ErrNotConnected
} else if err == ua.StatusBadTimeout {
case ua.StatusBadTimeout:
g.client.Close(ctx)
g.client = nil
return nil, nil, service.ErrNotConnected
} else if err == ua.StatusBadConnectionRejected {
case ua.StatusBadConnectionRejected:
g.client.Close(ctx)
g.client = nil
return nil, nil, service.ErrNotConnected
Expand All @@ -609,6 +666,7 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s
// return error and stop executing this function.
return nil, nil, err
}

if resp.Results[0].Status != ua.StatusOK {
g.log.Errorf("Status not OK: %v", resp.Results[0].Status)
}
Expand All @@ -617,8 +675,12 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s
msgs := service.MessageBatch{}

for i, node := range g.nodeList {

message := g.createMessageFromValue(resp.Results[i].Value.Value(), node.NodeID.String())
value := resp.Results[i].Value
if value == nil {
g.log.Errorf("Received nil from node: %s", node.NodeID.String())
continue
}
message := g.createMessageFromValue(value.Value(), node.NodeID.String())
if message != nil {
msgs = append(msgs, message)
}
Expand Down

0 comments on commit 5b1f55b

Please sign in to comment.