Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 90 additions & 54 deletions v8/Client/ParallelServiceProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public IDictionary<string, TResponse> Execute<TRequest, TResponse>(IDictionary<s
errorHandler)
.ToDictionary(r => r.Key, r => r.Value);
}

/// <summary>
/// Performs data parallelism on a collection of type <see cref="DiscoveryRequest"/> to execute <see cref="IDiscoveryService"/>.Execute() requests concurrently
/// </summary>
Expand Down Expand Up @@ -205,7 +205,7 @@ public IEnumerable<TResponse> Execute<TRequest, TResponse>(IEnumerable<TRequest>
#endregion

#region Core Parallel Execution Method <TRequest, TResponse>

/// <summary>
/// Core implementation of the parallel pattern for service operations that should collect responses to each request
/// </summary>
Expand All @@ -226,12 +226,12 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>

// Inline method for initializing a new discovery service channel
Func<ManagedTokenDiscoveryServiceProxy> proxyInit = () =>
{
var proxy = this.ServiceManager.GetProxy();
proxy.SetProxyOptions(options);
{
var proxy = this.ServiceManager.GetProxy();
proxy.SetProxyOptions(options);

return proxy;
};
return proxy;
};

using (var threadLocalProxy = new ThreadLocal<ManagedTokenDiscoveryServiceProxy>(proxyInit, true))
{
Expand All @@ -240,7 +240,7 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>
Parallel.ForEach<TRequest, ParallelDiscoveryOperationContext<TRequest, TResponse>>(requests,
new ParallelOptions() { MaxDegreeOfParallelism = this.MaxDegreeOfParallelism },
() => new ParallelDiscoveryOperationContext<TRequest, TResponse>(threadLocalProxy.Value),
(request, loopState, index, context) =>
(request, loopState, index, context) =>
{
try
{
Expand Down Expand Up @@ -301,13 +301,19 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>
/// </remarks>
public class ParallelOrganizationServiceProxy : ParallelServiceProxy<OrganizationServiceManager>
{
private const int RateLimitExceededErrorCode = -2147015902;
private const int TimeLimitExceededErrorCode = -2147015903;
private const int ConcurrencyLimitExceededErrorCode = -2147015898;

private const int MaxRetries = 3;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this configurable, part of OrganizationServiceProxyOptions, set default to 3 if none provided


#region Constructor(s)

public ParallelOrganizationServiceProxy(OrganizationServiceManager serviceManager)
: base(serviceManager) { }
: base(serviceManager) { }

public ParallelOrganizationServiceProxy(OrganizationServiceManager serviceManager, int maxDegreeOfParallelism)
: base(serviceManager, maxDegreeOfParallelism) { }
: base(serviceManager, maxDegreeOfParallelism) { }

#endregion

Expand Down Expand Up @@ -397,9 +403,9 @@ public IEnumerable<Entity> Create(IEnumerable<Entity> targets, OrganizationServi
{
return this.ExecuteOperationWithResponse<Entity, Entity>(targets, options,
(target, context) =>
{
{
target.Id = context.Local.Create(target); //Hydrate target with response Id

//Collect the result from each iteration in this partition
context.Results.Add(target);
},
Expand Down Expand Up @@ -513,7 +519,7 @@ public void Associate(IEnumerable<AssociateRequest> requests, OrganizationServic
/// <param name="errorHandler">An optional error handling operation. Handler will be passed the request that failed along with the corresponding <see cref="FaultException{OrganizationServiceFault}"/></param>
/// <exception cref="AggregateException">callers should catch <see cref="AggregateException"/> to handle exceptions raised by individual requests</exception>
public void Disassociate(IEnumerable<DisassociateRequest> requests, Action<DisassociateRequest, FaultException<OrganizationServiceFault>> errorHandler = null)
{
{
this.Disassociate(requests, new OrganizationServiceProxyOptions(), errorHandler);
}

Expand Down Expand Up @@ -664,7 +670,7 @@ public IDictionary<string, EntityCollection> RetrieveMultiple(IDictionary<string
},
errorHandler)
.ToDictionary(r => r.Key, r => r.Value);
}
}

#endregion

Expand Down Expand Up @@ -836,16 +842,16 @@ private void ExecuteOperation<TRequest>(IEnumerable<TRequest> requests, Organiza
Action<TRequest, ManagedTokenOrganizationServiceProxy> operation, Action<TRequest, FaultException<OrganizationServiceFault>> errorHandler)
{
var allFailures = new ConcurrentBag<ParallelOrganizationOperationFailure<TRequest>>();

// Inline method for initializing a new organization service channel
Func<ManagedTokenOrganizationServiceProxy> proxyInit = () =>
{
var proxy = this.ServiceManager.GetProxy();
proxy.SetProxyOptions(options);
{
var proxy = this.ServiceManager.GetProxy();
proxy.SetProxyOptions(options);

return proxy;
};

return proxy;
};

using (var threadLocalProxy = new ThreadLocal<ManagedTokenOrganizationServiceProxy>(proxyInit, true))
{
try
Expand All @@ -855,20 +861,22 @@ private void ExecuteOperation<TRequest>(IEnumerable<TRequest> requests, Organiza
() => new ParallelOrganizationOperationContext<TRequest, bool>(),
(request, loopState, index, context) =>
{
try
{
operation(request, threadLocalProxy.Value);
}
catch (FaultException<OrganizationServiceFault> fault)
var retryCount = 0;
while (true)
{
// Track faults locally
if (errorHandler != null)
try
{
context.Failures.Add(new ParallelOrganizationOperationFailure<TRequest>(request, fault));
operation(request, threadLocalProxy.Value);
}
else
catch (FaultException<OrganizationServiceFault> e) when (IsTransientError(e) && ++retryCount < MaxRetries)
{
throw;
ApplyDelay(e, retryCount);
}
catch (FaultException<OrganizationServiceFault> fault) when (errorHandler != null)
{
// Track faults locally
context.Failures.Add(new ParallelOrganizationOperationFailure<TRequest>(request, fault));
break;
}
}

Expand All @@ -891,7 +899,7 @@ private void ExecuteOperation<TRequest>(IEnumerable<TRequest> requests, Organiza
{
foreach (var failure in allFailures)
{
errorHandler(failure.Request, failure.Exception);
errorHandler(failure.Request, failure.Exception);
}
}
}
Expand All @@ -917,13 +925,13 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>

// Inline method for initializing a new organization service channel
Func<ManagedTokenOrganizationServiceProxy> proxyInit = () =>
{
var proxy = this.ServiceManager.GetProxy();
proxy.SetProxyOptions(options);
{
var proxy = this.ServiceManager.GetProxy();
proxy.SetProxyOptions(options);

return proxy;
};

return proxy;
};

using (var threadLocalProxy = new ThreadLocal<ManagedTokenOrganizationServiceProxy>(proxyInit, true))
{
try
Expand All @@ -933,30 +941,32 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>
() => new ParallelOrganizationOperationContext<TRequest, TResponse>(threadLocalProxy.Value),
(request, loopState, index, context) =>
{
try
{
coreOperation(request, context);
}
catch (FaultException<OrganizationServiceFault> fault)
var retryCount = 0;
while (true)
{
// Track faults locally
if (errorHandler != null)
try
{
context.Failures.Add(new ParallelOrganizationOperationFailure<TRequest>(request, fault));
coreOperation(request, context);
}
else
catch (FaultException<OrganizationServiceFault> e) when (IsTransientError(e) && ++retryCount < MaxRetries)
{
throw;
ApplyDelay(e, retryCount);
}
catch (FaultException<OrganizationServiceFault> fault) when (errorHandler != null)
{
// Track faults locally
context.Failures.Add(new ParallelOrganizationOperationFailure<TRequest>(request, fault));
break;
}
}

return context;
},
(context) =>
{
{
// Join results and faults together
Array.ForEach(context.Results.ToArray(), r => allResponses.Add(r));
Array.ForEach(context.Failures.ToArray(), f => allFailures.Add(f));
Array.ForEach(context.Failures.ToArray(), f => allFailures.Add(f));

// Remove temporary reference to ThreadLocal proxy
context.Local = null;
Expand All @@ -971,15 +981,41 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>
// Handle faults
if (errorHandler != null)
{
foreach(var failure in allFailures)
foreach (var failure in allFailures)
{
errorHandler(failure.Request, failure.Exception);
errorHandler(failure.Request, failure.Exception);
}
}

return allResponses;
}


private static void ApplyDelay(FaultException<OrganizationServiceFault> e, int retryCount)
{
TimeSpan delay;
if (e.Detail.ErrorCode == RateLimitExceededErrorCode)
{
// Use Retry-After delay when specified
delay = (TimeSpan)e.Detail.ErrorDetails["Retry-After"];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add a check here to see if "Retry-After" exists in the ErrorDetails, as this isn't always the case. There are different types of RateLimits represented by the same RateLimitExceededErrorCode, so don't assume. I already ran into this issue.

}
else
{
// else use exponential backoff delay
delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
}

Thread.Sleep(delay);
}

private static bool IsTransientError(FaultException<OrganizationServiceFault> ex)
{
// You can add more transient fault codes to retry here
return ex.Detail.ErrorCode == RateLimitExceededErrorCode ||
ex.Detail.ErrorCode == TimeLimitExceededErrorCode ||
ex.Detail.ErrorCode == ConcurrencyLimitExceededErrorCode;
}

#endregion
}

Expand All @@ -990,7 +1026,7 @@ private IEnumerable<TResponse> ExecuteOperationWithResponse<TRequest, TResponse>
public abstract class ParallelServiceProxy<T> : ParallelServiceProxy
where T : XrmServiceManagerBase
{
protected static object syncRoot = new Object();
protected static object syncRoot = new Object();

#region Constructor(s)

Expand All @@ -1014,7 +1050,7 @@ protected ParallelServiceProxy(T serviceManager, int maxDegreeOfParallelism)
#endregion

#region Properties

protected T ServiceManager { get; set; }


Expand Down
2 changes: 1 addition & 1 deletion v8/Microsoft.Pfe.Xrm.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.CrmSdk.CoreAssemblies" Version="9.0.0.7" />
<PackageReference Include="Microsoft.CrmSdk.CoreAssemblies" Version="9.0.2.12" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.ServiceModel" />
Expand Down