Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion dotnet/src/Microsoft.Agents.AI.Purview/BackgroundJobRunner.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Purview.Models.Common;
using Microsoft.Agents.AI.Purview.Models.Jobs;
using Microsoft.Agents.AI.Purview.Models.Requests;
using Microsoft.Agents.AI.Purview.Models.Responses;
using Microsoft.Extensions.Logging;

namespace Microsoft.Agents.AI.Purview;
Expand All @@ -16,19 +20,22 @@ internal sealed class BackgroundJobRunner : IBackgroundJobRunner
{
private readonly IChannelHandler _channelHandler;
private readonly IPurviewClient _purviewClient;
private readonly ICacheProvider _cacheProvider;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="BackgroundJobRunner"/> class.
/// </summary>
/// <param name="channelHandler">The channel handler used to manage job channels.</param>
/// <param name="purviewClient">The Purview client used to send requests to Purview.</param>
/// <param name="cacheProvider">The cache provider used to store protection scopes results.</param>
/// <param name="logger">The logger used to log information about background jobs.</param>
/// <param name="purviewSettings">The settings used to configure Purview client behavior.</param>
public BackgroundJobRunner(IChannelHandler channelHandler, IPurviewClient purviewClient, ILogger logger, PurviewSettings purviewSettings)
public BackgroundJobRunner(IChannelHandler channelHandler, IPurviewClient purviewClient, ICacheProvider cacheProvider, ILogger logger, PurviewSettings purviewSettings)
{
this._channelHandler = channelHandler;
this._purviewClient = purviewClient;
this._cacheProvider = cacheProvider;
this._logger = logger;

for (int i = 0; i < purviewSettings.MaxConcurrentJobConsumers; i++)
Expand Down Expand Up @@ -67,6 +74,28 @@ private async Task RunJobAsync(BackgroundJobBase job)
break;
case ContentActivityJob contentActivityJob:
_ = await this._purviewClient.SendContentActivitiesAsync(contentActivityJob.Request, CancellationToken.None).ConfigureAwait(false);
break;
case ScopeRetrievalJob scopeRetrievalJob:
try
{
ProtectionScopesResponse response = await this._purviewClient.GetProtectionScopesAsync(scopeRetrievalJob.Request, CancellationToken.None).ConfigureAwait(false);
await this._cacheProvider.SetAsync(scopeRetrievalJob.CacheKey, response, CancellationToken.None).ConfigureAwait(false);
(bool shouldProcess, List<DlpActionInfo> _, ExecutionMode _) = ScopedContentProcessor.CheckApplicableScopes(scopeRetrievalJob.ProcessContentRequest, response);
if (!shouldProcess)
{
ProcessContentRequest pcRequest = scopeRetrievalJob.ProcessContentRequest;
ContentActivitiesRequest caRequest = new(pcRequest.UserId, pcRequest.TenantId, pcRequest.ContentToProcess, pcRequest.CorrelationId);
this._channelHandler.QueueJob(new ContentActivityJob(caRequest));
}
}
catch (PurviewPaymentRequiredException ex)
{
await this._cacheProvider.SetAsync(
new PaymentRequiredCacheKey(scopeRetrievalJob.Request.TenantId),
new PaymentRequiredCacheEntry(ex.Message),
CancellationToken.None).ConfigureAwait(false);
}

break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Agents.AI.Purview.Models.Common;

/// <summary>
/// Cached tenant-level payment required state.
/// </summary>
internal sealed class PaymentRequiredCacheEntry
{
/// <summary>
/// Creates a new instance of <see cref="PaymentRequiredCacheEntry"/>.
/// </summary>
/// <param name="message">The payment required error message.</param>
public PaymentRequiredCacheEntry(string? message)
{
this.Message = message;
}

/// <summary>
/// The payment required error message.
/// </summary>
public string? Message { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.

namespace Microsoft.Agents.AI.Purview.Models.Common;

/// <summary>
/// A cache key for tenant-level payment required state.
/// </summary>
internal sealed class PaymentRequiredCacheKey
{
/// <summary>
/// Creates a new instance of <see cref="PaymentRequiredCacheKey"/>.
/// </summary>
/// <param name="tenantId">The id of the tenant.</param>
public PaymentRequiredCacheKey(string tenantId)
{
this.TenantId = tenantId;
}

/// <summary>
/// The id of the tenant.
/// </summary>
public string TenantId { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Microsoft. All rights reserved.

using Microsoft.Agents.AI.Purview.Models.Common;
using Microsoft.Agents.AI.Purview.Models.Requests;

namespace Microsoft.Agents.AI.Purview.Models.Jobs;

/// <summary>
/// Class representing a job that refreshes the protection scopes cache in the background.
/// </summary>
/// <remarks>
/// Used by the parallel protection scopes retrieval path to warm the cache without blocking the
/// foreground ProcessContent call.
/// </remarks>
internal sealed class ScopeRetrievalJob : BackgroundJobBase
{
/// <summary>
/// Initializes a new instance of the <see cref="ScopeRetrievalJob"/> class.
/// </summary>
/// <param name="request">The protection scopes request to send to Purview.</param>
/// <param name="cacheKey">The cache key used to store the response.</param>
/// <param name="processContentRequest">The original process content request that triggered scope retrieval.</param>
public ScopeRetrievalJob(ProtectionScopesRequest request, ProtectionScopesCacheKey cacheKey, ProcessContentRequest processContentRequest)
{
this.Request = request;
this.CacheKey = cacheKey;
this.ProcessContentRequest = processContentRequest;
}

/// <summary>
/// Gets the protection scopes request.
/// </summary>
public ProtectionScopesRequest Request { get; }

/// <summary>
/// Gets the cache key used to store the response.
/// </summary>
public ProtectionScopesCacheKey CacheKey { get; }

/// <summary>
/// Gets the original process content request that triggered scope retrieval.
/// </summary>
public ProcessContentRequest ProcessContentRequest { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ public ProcessContentRequest(ContentToProcess contentToProcess, string userId, s
/// </summary>
[JsonIgnore]
internal string? ScopeIdentifier { get; set; }

/// <summary>
/// Indicates whether the ProcessContent request should ask the service for inline evaluation.
/// </summary>
[JsonIgnore]
internal bool ProcessInline { get; set; }
}
5 changes: 5 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Purview/PurviewClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public async Task<ProcessContentResponse> ProcessContentAsync(ProcessContentRequ
message.Headers.Add("If-None-Match", request.ScopeIdentifier);
}

if (request.ProcessInline)
{
message.Headers.Add("Prefer", "evaluateInline");
}

string content = JsonSerializer.Serialize(request, PurviewSerializationUtils.SerializationSettings.GetTypeInfo(typeof(ProcessContentRequest)));
message.Content = new StringContent(content, Encoding.UTF8, "application/json");

Expand Down
4 changes: 2 additions & 2 deletions dotnet/src/Microsoft.Agents.AI.Purview/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ The policy logic is identical; the only difference is the hook point in the pipe

The user id from the prompt message(s) is reused for the response evaluation so both evaluations map consistently to the same user.

There are several optimizations to speed up Purview calls. Protection scope lookups (the first step in evaluation) are cached to minimize network calls.
If the policies allow content to be processed offline, the middleware will add the process content request to a channel and run it in a background worker. Similarly, the middleware will run a background request if no scopes apply and the interaction only has to be logged in Audit.
There are several optimizations to speed up Purview calls. Protection scope lookups (the first step in evaluation) are cached to minimize network calls. When a lookup is not cached, the middleware will refresh it in a background worker so the foreground ProcessContent request does not have to wait.
If the policies allow content to be processed offline, the middleware will add the process content request to a channel and run it in a background worker. Similarly, the middleware will run a background request if no scopes apply and the interaction only has to be logged in Audit. Payment Required responses from background scope lookups are cached at the tenant level so subsequent requests for the tenant short-circuit.

## Exceptions
| Exception | Scenario |
Expand Down
98 changes: 78 additions & 20 deletions dotnet/src/Microsoft.Agents.AI.Purview/ScopedContentProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Purview.Models.Common;
Expand Down Expand Up @@ -193,43 +194,60 @@ private async Task<ProcessContentResponse> ProcessContentWithProtectionScopesAsy
{
ProtectionScopesRequest psRequest = CreateProtectionScopesRequest(pcRequest, pcRequest.UserId, pcRequest.TenantId, pcRequest.CorrelationId);

PaymentRequiredCacheEntry? cachedPaymentRequired = await this._cacheProvider.GetAsync<PaymentRequiredCacheKey, PaymentRequiredCacheEntry>(
new PaymentRequiredCacheKey(pcRequest.TenantId),
cancellationToken).ConfigureAwait(false);

if (cachedPaymentRequired != null)
{
throw new PurviewPaymentRequiredException(cachedPaymentRequired.Message ?? "Payment required");
}

ProtectionScopesCacheKey cacheKey = new(psRequest);

ProtectionScopesResponse? cacheResponse = await this._cacheProvider.GetAsync<ProtectionScopesCacheKey, ProtectionScopesResponse>(cacheKey, cancellationToken).ConfigureAwait(false);

ProtectionScopesResponse psResponse;

if (cacheResponse != null)
{
psResponse = cacheResponse;
return await this.ProcessWithCachedScopesAsync(pcRequest, cacheResponse, cacheKey, cancellationToken).ConfigureAwait(false);
}
else

try
{
psResponse = await this._purviewClient.GetProtectionScopesAsync(psRequest, cancellationToken).ConfigureAwait(false);
await this._cacheProvider.SetAsync(cacheKey, psResponse, cancellationToken).ConfigureAwait(false);
this._channelHandler.QueueJob(new ScopeRetrievalJob(psRequest, cacheKey, pcRequest));
}
Comment thread
taisirhassan marked this conversation as resolved.
catch (PurviewJobException)
{
// QueueJob already logs failures. Scope warmup is best effort; don't block ProcessContent.
}

Comment thread
taisirhassan marked this conversation as resolved.
return await this.CallProcessContentAsync(pcRequest, cacheKey, dlpActions: null, cancellationToken).ConfigureAwait(false);
Comment thread
taisirhassan marked this conversation as resolved.
Comment thread
taisirhassan marked this conversation as resolved.
}

/// <summary>
/// Apply locally-cached protection scopes to the request and dispatch ProcessContent appropriately.
/// </summary>
private async Task<ProcessContentResponse> ProcessWithCachedScopesAsync(
ProcessContentRequest pcRequest,
ProtectionScopesResponse psResponse,
ProtectionScopesCacheKey cacheKey,
CancellationToken cancellationToken)
{
pcRequest.ScopeIdentifier = psResponse.ScopeIdentifier;

(bool shouldProcess, List<DlpActionInfo> dlpActions, ExecutionMode executionMode) = CheckApplicableScopes(pcRequest, psResponse);

if (shouldProcess)
{
pcRequest.ProcessInline = executionMode == ExecutionMode.EvaluateInline;

if (executionMode == ExecutionMode.EvaluateOffline)
{
this._channelHandler.QueueJob(new ProcessContentJob(pcRequest));
return new ProcessContentResponse();
}

ProcessContentResponse pcResponse = await this._purviewClient.ProcessContentAsync(pcRequest, cancellationToken).ConfigureAwait(false);

if (pcResponse.ProtectionScopeState == ProtectionScopeState.Modified)
{
await this._cacheProvider.RemoveAsync(cacheKey, cancellationToken).ConfigureAwait(false);
}

pcResponse = CombinePolicyActions(pcResponse, dlpActions);
return pcResponse;
return await this.CallProcessContentAsync(pcRequest, cacheKey, dlpActions, cancellationToken).ConfigureAwait(false);
}

ContentActivitiesRequest caRequest = new(pcRequest.UserId, pcRequest.TenantId, pcRequest.ContentToProcess, pcRequest.CorrelationId);
Expand All @@ -238,6 +256,30 @@ private async Task<ProcessContentResponse> ProcessContentWithProtectionScopesAsy
return new ProcessContentResponse();
}

/// <summary>
/// Call ProcessContent and invalidate the protection scopes cache when the response indicates the cached scopes are stale.
/// </summary>
private async Task<ProcessContentResponse> CallProcessContentAsync(
ProcessContentRequest pcRequest,
ProtectionScopesCacheKey cacheKey,
List<DlpActionInfo>? dlpActions,
CancellationToken cancellationToken)
{
ProcessContentResponse pcResponse = await this._purviewClient.ProcessContentAsync(pcRequest, cancellationToken).ConfigureAwait(false);

if (pcRequest.ScopeIdentifier != null && pcResponse.ProtectionScopeState == ProtectionScopeState.Modified)
{
await this._cacheProvider.RemoveAsync(cacheKey, cancellationToken).ConfigureAwait(false);
}

if (dlpActions?.Count > 0)
{
pcResponse = CombinePolicyActions(pcResponse, dlpActions);
}

return pcResponse;
}

/// <summary>
/// Dedupe policy actions received from the service.
/// </summary>
Expand All @@ -248,9 +290,21 @@ private static ProcessContentResponse CombinePolicyActions(ProcessContentRespons
{
if (actionInfos?.Count > 0)
{
pcResponse.PolicyActions = pcResponse.PolicyActions is null ?
actionInfos :
[.. pcResponse.PolicyActions, .. actionInfos];
List<DlpActionInfo> combinedActions = [];
HashSet<(DlpAction Action, RestrictionAction? RestrictionAction)> seenActions = [];
IEnumerable<DlpActionInfo> allActions = pcResponse.PolicyActions is null
? actionInfos
: pcResponse.PolicyActions.Concat(actionInfos);

foreach (DlpActionInfo actionInfo in allActions)
{
if (seenActions.Add((actionInfo.Action, actionInfo.RestrictionAction)))
{
combinedActions.Add(actionInfo);
}
}

pcResponse.PolicyActions = combinedActions;
}

return pcResponse;
Expand All @@ -262,7 +316,7 @@ private static ProcessContentResponse CombinePolicyActions(ProcessContentRespons
/// <param name="pcRequest">The process content request.</param>
/// <param name="psResponse">The protection scopes response that was returned for the process content request.</param>
/// <returns>A bool indicating if the content needs to be processed. A list of applicable actions from the scopes response, and the execution mode for the process content request.</returns>
private static (bool shouldProcess, List<DlpActionInfo> dlpActions, ExecutionMode executionMode) CheckApplicableScopes(ProcessContentRequest pcRequest, ProtectionScopesResponse psResponse)
internal static (bool shouldProcess, List<DlpActionInfo> dlpActions, ExecutionMode executionMode) CheckApplicableScopes(ProcessContentRequest pcRequest, ProtectionScopesResponse psResponse)
{
ProtectionScopeActivities requestActivity = TranslateActivity(pcRequest.ContentToProcess.ActivityMetadata.Activity);

Expand All @@ -284,7 +338,11 @@ private static (bool shouldProcess, List<DlpActionInfo> dlpActions, ExecutionMod

foreach (var location in scope.Locations ?? Array.Empty<PolicyLocation>())
{
locationMatch = location.DataType.EndsWith(locationType, StringComparison.OrdinalIgnoreCase) && location.Value.Equals(locationValue, StringComparison.OrdinalIgnoreCase);
if (location.DataType.EndsWith(locationType, StringComparison.OrdinalIgnoreCase) && location.Value.Equals(locationValue, StringComparison.OrdinalIgnoreCase))
{
locationMatch = true;
break;
}
}

if (activityMatch && locationMatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace Microsoft.Agents.AI.Purview.Serialization;
[JsonSerializable(typeof(ContentActivitiesRequest))]
[JsonSerializable(typeof(ContentActivitiesResponse))]
[JsonSerializable(typeof(ProtectionScopesCacheKey))]
[JsonSerializable(typeof(PaymentRequiredCacheKey))]
[JsonSerializable(typeof(PaymentRequiredCacheEntry))]
internal sealed partial class SourceGenerationContext : JsonSerializerContext;

/// <summary>
Expand Down
Loading
Loading