Skip to content

Commit

Permalink
Merge pull request #394 from mycroes/sync
Browse files Browse the repository at this point in the history
Add synchronization
  • Loading branch information
mycroes authored Jun 2, 2021
2 parents 632e1c1 + f67b1e7 commit ea3beff
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 77 deletions.
28 changes: 28 additions & 0 deletions S7.Net/Internal/TaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace S7.Net.Internal
{
internal class TaskQueue
{
private static readonly object Sentinel = new object();

private Task prev = Task.FromResult(Sentinel);

public async Task<T> Enqueue<T>(Func<Task<T>> action)
{
var tcs = new TaskCompletionSource<object>();
await Interlocked.Exchange(ref prev, tcs.Task).ConfigureAwait(false);

try
{
return await action.Invoke().ConfigureAwait(false);
}
finally
{
tcs.SetResult(Sentinel);
}
}
}
}
14 changes: 14 additions & 0 deletions S7.Net/PLC.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using S7.Net.Internal;
using S7.Net.Protocol;
using S7.Net.Types;

Expand All @@ -13,6 +15,8 @@ namespace S7.Net
/// </summary>
public partial class Plc : IDisposable
{
private readonly TaskQueue queue = new TaskQueue();

private const int CONNECTION_TIMED_OUT_ERROR_CODE = 10060;

//TCP connection to device
Expand Down Expand Up @@ -242,6 +246,16 @@ internal static void ValidateResponseCode(ReadWriteErrorCode statusCode)
}
}

private Stream GetStreamIfAvailable()
{
if (_stream == null)
{
throw new PlcException(ErrorCode.ConnectionError, "Plc is not connected");
}

return _stream;
}

#region IDisposable Support
private bool disposedValue = false; // To detect redundant calls

Expand Down
104 changes: 57 additions & 47 deletions S7.Net/PlcAsynchronous.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using S7.Net.Types;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
using S7.Net.Protocol;
using System.IO;
using System.Threading;
using S7.Net.Protocol.S7;

Expand All @@ -28,9 +28,14 @@ public async Task OpenAsync(CancellationToken cancellationToken = default)
var stream = await ConnectAsync().ConfigureAwait(false);
try
{
cancellationToken.ThrowIfCancellationRequested();
await EstablishConnection(stream, cancellationToken).ConfigureAwait(false);
_stream = stream;
await queue.Enqueue(async () =>
{
cancellationToken.ThrowIfCancellationRequested();
await EstablishConnection(stream, cancellationToken).ConfigureAwait(false);
_stream = stream;

return default(object);
}).ConfigureAwait(false);
}
catch(Exception)
{
Expand All @@ -47,29 +52,30 @@ private async Task<NetworkStream> ConnectAsync()
return tcpClient.GetStream();
}

private async Task EstablishConnection(NetworkStream stream, CancellationToken cancellationToken)
private async Task EstablishConnection(Stream stream, CancellationToken cancellationToken)
{
await RequestConnection(stream, cancellationToken).ConfigureAwait(false);
await SetupConnection(stream, cancellationToken).ConfigureAwait(false);
}

private async Task RequestConnection(NetworkStream stream, CancellationToken cancellationToken)
private async Task RequestConnection(Stream stream, CancellationToken cancellationToken)
{
var requestData = ConnectionRequest.GetCOTPConnectionRequest(CPU, Rack, Slot);
await stream.WriteAsync(requestData, 0, requestData.Length).ConfigureAwait(false);
var response = await COTP.TPDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
var response = await NoLockRequestTpduAsync(stream, requestData, cancellationToken).ConfigureAwait(false);

if (response.PDUType != COTP.PduType.ConnectionConfirmed)
{
throw new InvalidDataException("Connection request was denied", response.TPkt.Data, 1, 0x0d);
}
}

private async Task SetupConnection(NetworkStream stream, CancellationToken cancellationToken)
private async Task SetupConnection(Stream stream, CancellationToken cancellationToken)
{
var setupData = GetS7ConnectionSetup();
await stream.WriteAsync(setupData, 0, setupData.Length).ConfigureAwait(false);

var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
var s7data = await NoLockRequestTsduAsync(stream, setupData, 0, setupData.Length, cancellationToken)
.ConfigureAwait(false);

if (s7data.Length < 2)
throw new WrongNumberOfBytesException("Not enough data received in response to Communication Setup");

Expand Down Expand Up @@ -112,7 +118,7 @@ public async Task<byte[]> ReadBytesAsync(DataType dataType, int db, int startByt
}

/// <summary>
/// Read and decode a certain number of bytes of the "VarType" provided.
/// Read and decode a certain number of bytes of the "VarType" provided.
/// This can be used to read multiple consecutive variables of the same type (Word, DWord, Int, etc).
/// If the read was not successful, check LastErrorCode or LastErrorString.
/// </summary>
Expand Down Expand Up @@ -179,10 +185,10 @@ public async Task<byte[]> ReadBytesAsync(DataType dataType, int db, int startByt
}

/// <summary>
/// Reads all the bytes needed to fill a class in C#, starting from a certain address, and set all the properties values to the value that are read from the PLC.
/// Reads all the bytes needed to fill a class in C#, starting from a certain address, and set all the properties values to the value that are read from the PLC.
/// This reads only properties, it doesn't read private variable or public variable without {get;set;} specified.
/// </summary>
/// <param name="sourceClass">Instance of the class that will store the values</param>
/// <param name="sourceClass">Instance of the class that will store the values</param>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
Expand All @@ -205,7 +211,7 @@ public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db,
}

/// <summary>
/// Reads all the bytes needed to fill a class in C#, starting from a certain address, and set all the properties values to the value that are read from the PLC.
/// Reads all the bytes needed to fill a class in C#, starting from a certain address, and set all the properties values to the value that are read from the PLC.
/// This reads only properties, it doesn't read private variable or public variable without {get;set;} specified. To instantiate the class defined by the generic
/// type, the class needs a default constructor.
/// </summary>
Expand All @@ -221,7 +227,7 @@ public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db,
}

/// <summary>
/// Reads all the bytes needed to fill a class in C#, starting from a certain address, and set all the properties values to the value that are read from the PLC.
/// Reads all the bytes needed to fill a class in C#, starting from a certain address, and set all the properties values to the value that are read from the PLC.
/// This reads only properties, it doesn't read private variable or public variable without {get;set;} specified.
/// </summary>
/// <typeparam name="T">The class that will be instantiated</typeparam>
Expand All @@ -245,29 +251,26 @@ public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db,
}

/// <summary>
/// Reads multiple vars in a single request.
/// Reads multiple vars in a single request.
/// You have to create and pass a list of DataItems and you obtain in response the same list with the values.
/// Values are stored in the property "Value" of the dataItem and are already converted.
/// If you don't want the conversion, just create a dataItem of bytes.
/// If you don't want the conversion, just create a dataItem of bytes.
/// The number of DataItems as well as the total size of the requested data can not exceed a certain limit (protocol restriction).
/// </summary>
/// <param name="dataItems">List of dataitems that contains the list of variables that must be read.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
public async Task<List<DataItem>> ReadMultipleVarsAsync(List<DataItem> dataItems, CancellationToken cancellationToken = default)
{
//Snap7 seems to choke on PDU sizes above 256 even if snap7
//Snap7 seems to choke on PDU sizes above 256 even if snap7
//replies with bigger PDU size in connection setup.
AssertPduSizeForRead(dataItems);

var stream = GetStreamIfAvailable();

try
{
var dataToSend = BuildReadRequestPackage(dataItems.Select(d => DataItem.GetDataItemAddress(d)).ToList());
await stream.WriteAsync(dataToSend, 0, dataToSend.Length).ConfigureAwait(false);
var s7data = await RequestTsduAsync(dataToSend, cancellationToken);

var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
ValidateResponseCode((ReadWriteErrorCode)s7data[14]);

ParseDataIntoDataItems(s7data, dataItems);
Expand Down Expand Up @@ -435,12 +438,9 @@ public async Task WriteClassAsync(object classValue, int db, int startByteAdr =

private async Task ReadBytesWithSingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var stream = GetStreamIfAvailable();

var dataToSend = BuildReadRequestPackage(new [] { new DataItemAddress(dataType, db, startByteAdr, count)});
await stream.WriteAsync(dataToSend, 0, dataToSend.Length, cancellationToken);

var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
var s7data = await RequestTsduAsync(dataToSend, cancellationToken);
AssertReadResponse(s7data, count);

Array.Copy(s7data, 18, buffer, offset, count);
Expand All @@ -456,13 +456,11 @@ public async Task WriteAsync(params DataItem[] dataItems)
{
AssertPduSizeForWrite(dataItems);

var stream = GetStreamIfAvailable();

var message = new ByteArray();
var length = S7WriteMultiple.CreateRequest(message, dataItems);
await stream.WriteAsync(message.Array, 0, length).ConfigureAwait(false);

var response = await COTP.TSDU.ReadAsync(stream, CancellationToken.None).ConfigureAwait(false);
var response = await RequestTsduAsync(message.Array, 0, length).ConfigureAwait(false);

S7WriteMultiple.ParseResponse(response, response.Length, dataItems);
}

Expand All @@ -476,15 +474,11 @@ public async Task WriteAsync(params DataItem[] dataItems)
/// <returns>A task that represents the asynchronous write operation.</returns>
private async Task WriteBytesWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] value, int dataOffset, int count, CancellationToken cancellationToken)
{

try
{
var stream = GetStreamIfAvailable();
var dataToSend = BuildWriteBytesPackage(dataType, db, startByteAdr, value, dataOffset, count);
var s7data = await RequestTsduAsync(dataToSend, cancellationToken).ConfigureAwait(false);

await stream.WriteAsync(dataToSend, 0, dataToSend.Length, cancellationToken).ConfigureAwait(false);

var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
ValidateResponseCode((ReadWriteErrorCode)s7data[14]);
}
catch (OperationCanceledException)
Expand All @@ -499,15 +493,11 @@ private async Task WriteBytesWithASingleRequestAsync(DataType dataType, int db,

private async Task WriteBitWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool bitValue, CancellationToken cancellationToken)
{
var stream = GetStreamIfAvailable();

try
{
var dataToSend = BuildWriteBitPackage(dataType, db, startByteAdr, bitValue, bitAdr);
var s7data = await RequestTsduAsync(dataToSend, cancellationToken).ConfigureAwait(false);

await stream.WriteAsync(dataToSend, 0, dataToSend.Length).ConfigureAwait(false);

var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
ValidateResponseCode((ReadWriteErrorCode)s7data[14]);
}
catch (OperationCanceledException)
Expand All @@ -520,13 +510,33 @@ private async Task WriteBitWithASingleRequestAsync(DataType dataType, int db, in
}
}

private Stream GetStreamIfAvailable()
private Task<byte[]> RequestTsduAsync(byte[] requestData, CancellationToken cancellationToken = default) =>
RequestTsduAsync(requestData, 0, requestData.Length, cancellationToken);

private Task<byte[]> RequestTsduAsync(byte[] requestData, int offset, int length, CancellationToken cancellationToken = default)
{
if (_stream == null)
{
throw new PlcException(ErrorCode.ConnectionError, "Plc is not connected");
}
return _stream;
var stream = GetStreamIfAvailable();

return queue.Enqueue(() =>
NoLockRequestTsduAsync(stream, requestData, offset, length, cancellationToken));
}

private static async Task<COTP.TPDU> NoLockRequestTpduAsync(Stream stream, byte[] requestData,
CancellationToken cancellationToken = default)
{
await stream.WriteAsync(requestData, 0, requestData.Length, cancellationToken).ConfigureAwait(false);
var response = await COTP.TPDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);

return response;
}

private static async Task<byte[]> NoLockRequestTsduAsync(Stream stream, byte[] requestData, int offset, int length,
CancellationToken cancellationToken = default)
{
await stream.WriteAsync(requestData, offset, length, cancellationToken).ConfigureAwait(false);
var response = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);

return response;
}
}
}
Loading

0 comments on commit ea3beff

Please sign in to comment.