Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ApplyRow Orchestrator #1245

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -18,119 +17,60 @@ public abstract partial class BaseOrchestrator
/// <summary>
/// Apply a delete on a row. if forceWrite, force the delete.
/// </summary>
internal virtual async Task<(SyncContext Context, bool Applied, Exception Exception)> InternalApplyDeleteAsync(
internal virtual Task<(SyncContext Context, bool Applied, Exception Exception)> InternalApplyDeleteAsync(
ScopeInfo scopeInfo, SyncContext context, BatchInfo batchInfo, SyncRow row, SyncTable schemaTable, long? lastTimestamp, Guid? senderScopeId, bool forceWrite,
DbConnection connection, DbTransaction transaction, IProgress<ProgressArgs> progress, CancellationToken cancellationToken)
{
// get executioning adapter
var syncAdapter = this.GetSyncAdapter(schemaTable, scopeInfo);

// Pre command if exists
var (preCommand, _) = await this.InternalGetCommandAsync(scopeInfo, context, syncAdapter, DbCommandType.PreDeleteRow,
connection, transaction, progress, cancellationToken).ConfigureAwait(false);

if (preCommand != null)
{
try
{
await this.InterceptAsync(new ExecuteCommandArgs(context, preCommand, DbCommandType.PreDeleteRow, connection, transaction), progress, cancellationToken).ConfigureAwait(false);
await preCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
preCommand.Dispose();
}
}

var (command, _) = await this.InternalGetCommandAsync(scopeInfo, context, syncAdapter, DbCommandType.DeleteRow,
connection, transaction, default, default).ConfigureAwait(false);

if (command == null)
return (context, false, null);

var batchArgs = new RowsChangesApplyingArgs(context, batchInfo, [row], schemaTable, SyncRowState.Modified, command, connection, transaction);
await this.InterceptAsync(batchArgs, progress, cancellationToken).ConfigureAwait(false);

if (batchArgs.Cancel || batchArgs.Command == null || batchArgs.SyncRows == null || batchArgs.SyncRows.Count <= 0)
return (context, false, null);

// get the correct pointer to the command from the interceptor in case user change the whole instance
command = batchArgs.Command;

// Set the parameters
this.InternalSetCommandParametersValues(context, command, DbCommandType.DeleteRow, syncAdapter, connection, transaction,
row, senderScopeId, lastTimestamp, true, forceWrite, progress, cancellationToken);

await this.InterceptAsync(new ExecuteCommandArgs(context, command, DbCommandType.DeleteRow, connection, transaction), progress, cancellationToken).ConfigureAwait(false);

Exception exception = null;
var rowDeletedCount = 0;

try
{
rowDeletedCount = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);

// Check if we have a return value instead
var syncRowCountParam = syncAdapter.GetParameter(context, command, "sync_row_count");

// Check if we have an handled error
var syncErrorText = syncAdapter.GetParameter(context, command, "sync_error_text");

if (syncRowCountParam != null && syncRowCountParam.Value != null && syncRowCountParam.Value != DBNull.Value)
rowDeletedCount = (int)syncRowCountParam.Value;

if (syncErrorText != null && syncErrorText.Value != null && syncErrorText.Value != DBNull.Value)
throw new Exception(syncErrorText.Value.ToString());
}
catch (Exception ex)
{
exception = ex;
}
finally
{
command.Dispose();
}

var rowAppliedArgs = new RowsChangesAppliedArgs(context, batchInfo, batchArgs.SyncRows, schemaTable, SyncRowState.Modified, rowDeletedCount, exception, connection, transaction);
await this.InterceptAsync(rowAppliedArgs, progress, cancellationToken).ConfigureAwait(false);

return (context, rowDeletedCount > 0, exception);
return this.InternalApplyChangeAsync(scopeInfo, context, batchInfo, row, schemaTable, lastTimestamp, senderScopeId, forceWrite,
connection, transaction, progress, cancellationToken, DbCommandType.PreDeleteRow, DbCommandType.DeleteRow);
}

/// <summary>
/// Apply a single update in the current datasource. if forceWrite, force the update.
/// </summary>
internal virtual async Task<(SyncContext Context, bool IsApplied, Exception Exception)> InternalApplyUpdateAsync(
internal virtual Task<(SyncContext Context, bool IsApplied, Exception Exception)> InternalApplyUpdateAsync(
ScopeInfo scopeInfo, SyncContext context, BatchInfo batchInfo, SyncRow row, SyncTable schemaTable, long? lastTimestamp, Guid? senderScopeId, bool forceWrite,
DbConnection connection, DbTransaction transaction, IProgress<ProgressArgs> progress, CancellationToken cancellationToken)
{
return this.InternalApplyChangeAsync(scopeInfo, context, batchInfo, row, schemaTable, lastTimestamp, senderScopeId, forceWrite,
connection, transaction, progress, cancellationToken, DbCommandType.PreUpdateRow, DbCommandType.UpdateRow);
}

private async Task<(SyncContext Context, bool Applied, Exception Exception)> InternalApplyChangeAsync(
ScopeInfo scopeInfo, SyncContext context, BatchInfo batchInfo, SyncRow row, SyncTable schemaTable, long? lastTimestamp, Guid? senderScopeId, bool forceWrite,
DbConnection connection, DbTransaction transaction, IProgress<ProgressArgs> progress, CancellationToken cancellationToken, DbCommandType preCommandType, DbCommandType commandType)
{
// get executioning adapter
var syncAdapter = this.GetSyncAdapter(schemaTable, scopeInfo);

// Pre command if exists
var (preCommand, _) = await this.InternalGetCommandAsync(scopeInfo, context, syncAdapter, DbCommandType.PreUpdateRow,
var (preCommand, _) = await this.InternalGetCommandAsync(scopeInfo, context, syncAdapter, preCommandType,
connection, transaction, progress, cancellationToken).ConfigureAwait(false);

if (preCommand != null)
{
try
{
await this.InterceptAsync(new ExecuteCommandArgs(context, preCommand, DbCommandType.PreUpdateRow, connection, transaction), progress, cancellationToken).ConfigureAwait(false);
await this.InterceptAsync(new ExecuteCommandArgs(context, preCommand, preCommandType, connection, transaction), progress, cancellationToken).ConfigureAwait(false);
await preCommand.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
#if NET6_0_OR_GREATER
await preCommand.DisposeAsync().ConfigureAwait(false);
#else
preCommand.Dispose();
#endif
}
}

var (command, _) = await this.InternalGetCommandAsync(scopeInfo, context, syncAdapter, DbCommandType.UpdateRow,
var (command, _) = await this.InternalGetCommandAsync(scopeInfo, context, syncAdapter, commandType,
connection, transaction, default, default).ConfigureAwait(false);

if (command == null)
return (context, false, null);

var batchArgs = new RowsChangesApplyingArgs(context, batchInfo, [row], schemaTable, SyncRowState.Modified, command, connection, transaction);
var batchArgs = new RowsChangesApplyingArgs(context, batchInfo, new List<SyncRow> { row }, schemaTable, SyncRowState.Modified, command, connection, transaction);
await this.InterceptAsync(batchArgs, progress, cancellationToken).ConfigureAwait(false);

if (batchArgs.Cancel || batchArgs.Command == null || batchArgs.SyncRows == null || batchArgs.SyncRows.Count <= 0)
Expand All @@ -140,27 +80,28 @@ public abstract partial class BaseOrchestrator
command = batchArgs.Command;

// Set the parameters value from row
this.InternalSetCommandParametersValues(context, command, DbCommandType.UpdateRow, syncAdapter, connection, transaction,
batchArgs.SyncRows.First(), senderScopeId, lastTimestamp, false, forceWrite, progress, cancellationToken);
this.InternalSetCommandParametersValues(context, command, commandType, syncAdapter, connection, transaction,
row, senderScopeId, lastTimestamp, commandType == DbCommandType.DeleteRow, forceWrite, progress, cancellationToken);

await this.InterceptAsync(new ExecuteCommandArgs(context, command, DbCommandType.UpdateRow, connection, transaction), progress, cancellationToken).ConfigureAwait(false);
await this.InterceptAsync(new ExecuteCommandArgs(context, command, commandType, connection, transaction), progress, cancellationToken).ConfigureAwait(false);

Exception exception = null;
var rowUpdatedCount = 0;
var rowCount = 0;

try
{
rowUpdatedCount = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
rowCount = await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);

// Check if we have a return value instead
var syncRowCountParam = syncAdapter.GetParameter(context, command, "sync_row_count");

// Check if we have an handled error
var syncErrorText = syncAdapter.GetParameter(context, command, "sync_error_text");

if (syncRowCountParam != null && syncRowCountParam.Value != null && syncRowCountParam.Value != DBNull.Value)
rowUpdatedCount = (int)syncRowCountParam.Value;
if (syncRowCountParam is not null && syncRowCountParam.Value is not null and not DBNull)
rowCount = (int)syncRowCountParam.Value;

if (syncErrorText != null && syncErrorText.Value != null && syncErrorText.Value != DBNull.Value)
if (syncErrorText is not null && syncErrorText.Value is not null and not DBNull)
throw new Exception(syncErrorText.Value.ToString());
}
catch (Exception ex)
Expand All @@ -169,13 +110,17 @@ public abstract partial class BaseOrchestrator
}
finally
{
#if NET6_0_OR_GREATER
await command.DisposeAsync().ConfigureAwait(false);
#else
command.Dispose();
#endif
}

var rowAppliedArgs = new RowsChangesAppliedArgs(context, batchInfo, batchArgs.SyncRows, schemaTable, SyncRowState.Modified, rowUpdatedCount, exception, connection, transaction);
var rowAppliedArgs = new RowsChangesAppliedArgs(context, batchInfo, batchArgs.SyncRows, schemaTable, SyncRowState.Modified, rowCount, exception, connection, transaction);
await this.InterceptAsync(rowAppliedArgs, progress, cancellationToken).ConfigureAwait(false);

return (context, rowUpdatedCount > 0, exception);
return (context, rowCount > 0, exception);
}
}
}