From 8a8b16a6032acf2db0a02cd64ec00376be88e08c Mon Sep 17 00:00:00 2001 From: Christophe Gondouin <14264672+cgoconseils@users.noreply.github.com> Date: Sat, 1 Jun 2019 10:32:41 +0200 Subject: [PATCH 1/2] Updating referenced Assembly version --- v8/Microsoft.Pfe.Xrm.Core.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v8/Microsoft.Pfe.Xrm.Core.csproj b/v8/Microsoft.Pfe.Xrm.Core.csproj index f08fa27..395bac7 100644 --- a/v8/Microsoft.Pfe.Xrm.Core.csproj +++ b/v8/Microsoft.Pfe.Xrm.Core.csproj @@ -35,7 +35,7 @@ true - + From a913a2d68e13f9a39d8b50ae6fe1e4ac268a13d0 Mon Sep 17 00:00:00 2001 From: Christophe Gondouin <14264672+cgoconseils@users.noreply.github.com> Date: Sat, 1 Jun 2019 10:32:53 +0200 Subject: [PATCH 2/2] Adding Transient error support --- v8/Client/ParallelServiceProxy.cs | 144 +++++++++++++++++++----------- 1 file changed, 90 insertions(+), 54 deletions(-) diff --git a/v8/Client/ParallelServiceProxy.cs b/v8/Client/ParallelServiceProxy.cs index b3dabda..0ada83e 100644 --- a/v8/Client/ParallelServiceProxy.cs +++ b/v8/Client/ParallelServiceProxy.cs @@ -126,7 +126,7 @@ public IDictionary Execute(IDictionary r.Key, r => r.Value); } - + /// /// Performs data parallelism on a collection of type to execute .Execute() requests concurrently /// @@ -205,7 +205,7 @@ public IEnumerable Execute(IEnumerable #endregion #region Core Parallel Execution Method - + /// /// Core implementation of the parallel pattern for service operations that should collect responses to each request /// @@ -226,12 +226,12 @@ private IEnumerable ExecuteOperationWithResponse // Inline method for initializing a new discovery service channel Func proxyInit = () => - { - var proxy = this.ServiceManager.GetProxy(); - proxy.SetProxyOptions(options); + { + var proxy = this.ServiceManager.GetProxy(); + proxy.SetProxyOptions(options); - return proxy; - }; + return proxy; + }; using (var threadLocalProxy = new ThreadLocal(proxyInit, true)) { @@ -240,7 +240,7 @@ private IEnumerable ExecuteOperationWithResponse Parallel.ForEach>(requests, new ParallelOptions() { MaxDegreeOfParallelism = this.MaxDegreeOfParallelism }, () => new ParallelDiscoveryOperationContext(threadLocalProxy.Value), - (request, loopState, index, context) => + (request, loopState, index, context) => { try { @@ -301,13 +301,19 @@ private IEnumerable ExecuteOperationWithResponse /// public class ParallelOrganizationServiceProxy : ParallelServiceProxy { + private const int RateLimitExceededErrorCode = -2147015902; + private const int TimeLimitExceededErrorCode = -2147015903; + private const int ConcurrencyLimitExceededErrorCode = -2147015898; + + private const int MaxRetries = 3; + #region Constructor(s) public ParallelOrganizationServiceProxy(OrganizationServiceManager serviceManager) - : base(serviceManager) { } + : base(serviceManager) { } public ParallelOrganizationServiceProxy(OrganizationServiceManager serviceManager, int maxDegreeOfParallelism) - : base(serviceManager, maxDegreeOfParallelism) { } + : base(serviceManager, maxDegreeOfParallelism) { } #endregion @@ -397,9 +403,9 @@ public IEnumerable Create(IEnumerable targets, OrganizationServi { return this.ExecuteOperationWithResponse(targets, options, (target, context) => - { + { target.Id = context.Local.Create(target); //Hydrate target with response Id - + //Collect the result from each iteration in this partition context.Results.Add(target); }, @@ -513,7 +519,7 @@ public void Associate(IEnumerable requests, OrganizationServic /// An optional error handling operation. Handler will be passed the request that failed along with the corresponding /// callers should catch to handle exceptions raised by individual requests public void Disassociate(IEnumerable requests, Action> errorHandler = null) - { + { this.Disassociate(requests, new OrganizationServiceProxyOptions(), errorHandler); } @@ -664,7 +670,7 @@ public IDictionary RetrieveMultiple(IDictionary r.Key, r => r.Value); - } + } #endregion @@ -836,16 +842,16 @@ private void ExecuteOperation(IEnumerable requests, Organiza Action operation, Action> errorHandler) { var allFailures = new ConcurrentBag>(); - + // Inline method for initializing a new organization service channel Func proxyInit = () => - { - var proxy = this.ServiceManager.GetProxy(); - proxy.SetProxyOptions(options); + { + var proxy = this.ServiceManager.GetProxy(); + proxy.SetProxyOptions(options); + + return proxy; + }; - return proxy; - }; - using (var threadLocalProxy = new ThreadLocal(proxyInit, true)) { try @@ -855,20 +861,22 @@ private void ExecuteOperation(IEnumerable requests, Organiza () => new ParallelOrganizationOperationContext(), (request, loopState, index, context) => { - try - { - operation(request, threadLocalProxy.Value); - } - catch (FaultException fault) + var retryCount = 0; + while (true) { - // Track faults locally - if (errorHandler != null) + try { - context.Failures.Add(new ParallelOrganizationOperationFailure(request, fault)); + operation(request, threadLocalProxy.Value); } - else + catch (FaultException e) when (IsTransientError(e) && ++retryCount < MaxRetries) { - throw; + ApplyDelay(e, retryCount); + } + catch (FaultException fault) when (errorHandler != null) + { + // Track faults locally + context.Failures.Add(new ParallelOrganizationOperationFailure(request, fault)); + break; } } @@ -891,7 +899,7 @@ private void ExecuteOperation(IEnumerable requests, Organiza { foreach (var failure in allFailures) { - errorHandler(failure.Request, failure.Exception); + errorHandler(failure.Request, failure.Exception); } } } @@ -917,13 +925,13 @@ private IEnumerable ExecuteOperationWithResponse // Inline method for initializing a new organization service channel Func proxyInit = () => - { - var proxy = this.ServiceManager.GetProxy(); - proxy.SetProxyOptions(options); + { + var proxy = this.ServiceManager.GetProxy(); + proxy.SetProxyOptions(options); + + return proxy; + }; - return proxy; - }; - using (var threadLocalProxy = new ThreadLocal(proxyInit, true)) { try @@ -933,30 +941,32 @@ private IEnumerable ExecuteOperationWithResponse () => new ParallelOrganizationOperationContext(threadLocalProxy.Value), (request, loopState, index, context) => { - try - { - coreOperation(request, context); - } - catch (FaultException fault) + var retryCount = 0; + while (true) { - // Track faults locally - if (errorHandler != null) + try { - context.Failures.Add(new ParallelOrganizationOperationFailure(request, fault)); + coreOperation(request, context); } - else + catch (FaultException e) when (IsTransientError(e) && ++retryCount < MaxRetries) { - throw; + ApplyDelay(e, retryCount); + } + catch (FaultException fault) when (errorHandler != null) + { + // Track faults locally + context.Failures.Add(new ParallelOrganizationOperationFailure(request, fault)); + break; } } return context; }, (context) => - { + { // Join results and faults together Array.ForEach(context.Results.ToArray(), r => allResponses.Add(r)); - Array.ForEach(context.Failures.ToArray(), f => allFailures.Add(f)); + Array.ForEach(context.Failures.ToArray(), f => allFailures.Add(f)); // Remove temporary reference to ThreadLocal proxy context.Local = null; @@ -971,15 +981,41 @@ private IEnumerable ExecuteOperationWithResponse // Handle faults if (errorHandler != null) { - foreach(var failure in allFailures) + foreach (var failure in allFailures) { - errorHandler(failure.Request, failure.Exception); + errorHandler(failure.Request, failure.Exception); } } return allResponses; } + + private static void ApplyDelay(FaultException e, int retryCount) + { + TimeSpan delay; + if (e.Detail.ErrorCode == RateLimitExceededErrorCode) + { + // Use Retry-After delay when specified + delay = (TimeSpan)e.Detail.ErrorDetails["Retry-After"]; + } + else + { + // else use exponential backoff delay + delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount)); + } + + Thread.Sleep(delay); + } + + private static bool IsTransientError(FaultException ex) + { + // You can add more transient fault codes to retry here + return ex.Detail.ErrorCode == RateLimitExceededErrorCode || + ex.Detail.ErrorCode == TimeLimitExceededErrorCode || + ex.Detail.ErrorCode == ConcurrencyLimitExceededErrorCode; + } + #endregion } @@ -990,7 +1026,7 @@ private IEnumerable ExecuteOperationWithResponse public abstract class ParallelServiceProxy : ParallelServiceProxy where T : XrmServiceManagerBase { - protected static object syncRoot = new Object(); + protected static object syncRoot = new Object(); #region Constructor(s) @@ -1014,7 +1050,7 @@ protected ParallelServiceProxy(T serviceManager, int maxDegreeOfParallelism) #endregion #region Properties - + protected T ServiceManager { get; set; }