Skip to content

Commit 5094781

Browse files
YunchuWangCopilot
andcommitted
Split on-demand sandbox client modules
Move declaration helpers and gRPC transport out of the public management client module, and rename the worker transport to OnDemandSandboxActivitiesGrpcTransport. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent e4b1b5a commit 5094781

6 files changed

Lines changed: 373 additions & 343 deletions

File tree

durabletask-azuremanaged/durabletask/azuremanaged/preview/on_demand_sandbox/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
"""
1717

1818
from durabletask.azuremanaged.preview.on_demand_sandbox.client import OnDemandSandboxActivitiesClient
19-
from durabletask.azuremanaged.preview.on_demand_sandbox.client import OnDemandSandboxWorkerProfile
20-
from durabletask.azuremanaged.preview.on_demand_sandbox.client import OnDemandSandboxWorkerProfileOptions
21-
from durabletask.azuremanaged.preview.on_demand_sandbox.client import on_demand_sandbox_worker_profile
19+
from durabletask.azuremanaged.preview.on_demand_sandbox.declarations import OnDemandSandboxWorkerProfile
20+
from durabletask.azuremanaged.preview.on_demand_sandbox.declarations import OnDemandSandboxWorkerProfileOptions
21+
from durabletask.azuremanaged.preview.on_demand_sandbox.declarations import on_demand_sandbox_worker_profile
2222
from durabletask.azuremanaged.preview.on_demand_sandbox.worker import OnDemandSandboxWorker
2323

2424
__all__ = [
Lines changed: 10 additions & 332 deletions
Original file line numberDiff line numberDiff line change
@@ -1,289 +1,21 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4-
from dataclasses import dataclass, field
5-
from decimal import Decimal, InvalidOperation
6-
from typing import Callable, Iterable, Optional, Sequence
4+
from typing import Optional, Sequence
75

86
import grpc
97
from azure.core.credentials import TokenCredential
108

11-
from durabletask import task
12-
from durabletask.azuremanaged.internal.durabletask_grpc_interceptor import (
13-
DTSDefaultClientInterceptorImpl,
9+
from durabletask.azuremanaged.preview.on_demand_sandbox.declarations import (
10+
build_profile_on_demand_sandbox_activity_declarations,
11+
)
12+
from durabletask.azuremanaged.preview.on_demand_sandbox.transport import (
13+
OnDemandSandboxActivitiesGrpcTransport,
1414
)
15-
from durabletask.azuremanaged.internal import on_demand_sandbox_activities_service_pb2 as pb
16-
from durabletask.azuremanaged.internal import on_demand_sandbox_activities_service_pb2_grpc as stubs
1715
from durabletask.grpc_options import GrpcChannelOptions
1816
import durabletask.internal.shared as shared
1917

2018

21-
DEFAULT_WORKER_PROFILE_ID = "default"
22-
DEFAULT_CPU = "1000m"
23-
DEFAULT_MEMORY = "2048Mi"
24-
DEFAULT_MAX_CONCURRENT_ACTIVITIES = 100
25-
26-
27-
@dataclass
28-
class OnDemandSandboxWorkerProfileOptions:
29-
"""Options for a decorated on-demand sandbox worker profile."""
30-
31-
worker_profile_id: str
32-
# Full OCI image reference for the sandbox worker container, for example
33-
# "myregistry.azurecr.io/workers/hello:1.0" or
34-
# "myregistry.azurecr.io/workers/hello@sha256:0123456789abcdef...".
35-
container_image: Optional[str] = None
36-
image_pull_managed_identity_client_id: Optional[str] = None
37-
scheduler_managed_identity_client_id: Optional[str] = None
38-
cpu: str = DEFAULT_CPU
39-
memory: str = DEFAULT_MEMORY
40-
environment_variables: dict[str, str] = field(default_factory=dict)
41-
max_concurrent_activities: int = DEFAULT_MAX_CONCURRENT_ACTIVITIES
42-
entrypoint: list[str] = field(default_factory=list)
43-
cmd: list[str] = field(default_factory=list)
44-
activity_names: list[str] = field(default_factory=list)
45-
46-
def add_activity(self, activity: str | Callable) -> None:
47-
"""Add an activity to the on-demand sandbox worker profile declaration."""
48-
activity_name = task.get_name(activity) if callable(activity) else activity
49-
self.activity_names.append(
50-
_normalize_required(activity_name, "On-demand sandbox activity name is required."))
51-
52-
53-
class OnDemandSandboxWorkerProfile:
54-
"""Base class for configuring a decorated on-demand sandbox worker profile."""
55-
56-
def configure(self, options: OnDemandSandboxWorkerProfileOptions) -> None:
57-
"""Configure the on-demand sandbox worker profile declaration options."""
58-
59-
60-
_worker_profiles: dict[str, OnDemandSandboxWorkerProfileOptions] = {}
61-
62-
63-
def on_demand_sandbox_worker_profile(worker_profile_id: str) -> Callable[[type], type]:
64-
"""Declare an on-demand sandbox worker profile using a decorated marker class."""
65-
normalized_profile = _normalize_required(worker_profile_id, "On-demand sandbox worker profile ID is required.")
66-
67-
def decorator(cls: type) -> type:
68-
if normalized_profile in _worker_profiles:
69-
raise ValueError(f"On-demand sandbox worker profile '{normalized_profile}' is declared more than once.")
70-
71-
options = OnDemandSandboxWorkerProfileOptions(worker_profile_id=normalized_profile)
72-
try:
73-
profile = cls()
74-
except TypeError as ex:
75-
raise TypeError("On-demand sandbox worker profile classes must have a parameterless constructor.") from ex
76-
77-
configure = getattr(profile, "configure", None)
78-
if callable(configure):
79-
configure(options)
80-
81-
_worker_profiles[normalized_profile] = options
82-
return cls
83-
84-
return decorator
85-
86-
87-
def resolve_activity_names(activity_names: str | Iterable[str]) -> list[str]:
88-
resolved: list[str] = []
89-
seen: set[str] = set()
90-
names = [activity_names] if isinstance(activity_names, str) else activity_names
91-
for name in names:
92-
normalized = name.strip()
93-
if normalized and normalized not in seen:
94-
resolved.append(normalized)
95-
seen.add(normalized)
96-
return resolved
97-
98-
99-
def build_on_demand_sandbox_activity_declaration(
100-
*,
101-
activity_names: str | Iterable[str],
102-
scheduler_managed_identity_client_id: str,
103-
worker_profile_id: str = DEFAULT_WORKER_PROFILE_ID,
104-
container_image: Optional[str] = None,
105-
image_pull_managed_identity_client_id: Optional[str] = None,
106-
cpu: str = DEFAULT_CPU,
107-
memory: str = DEFAULT_MEMORY,
108-
environment_variables: Optional[dict[str, str]] = None,
109-
max_concurrent_activities: int = DEFAULT_MAX_CONCURRENT_ACTIVITIES,
110-
entrypoint: Optional[Iterable[str]] = None,
111-
cmd: Optional[Iterable[str]] = None) -> pb.OnDemandSandboxActivityDeclaration:
112-
"""Build a sandbox activity declaration.
113-
114-
Args:
115-
container_image: Full OCI image reference for the sandbox worker container,
116-
such as "myregistry.azurecr.io/workers/hello:1.0" or
117-
"myregistry.azurecr.io/workers/hello@sha256:0123456789abcdef...".
118-
"""
119-
resolved_activity_names = resolve_activity_names(activity_names)
120-
if not resolved_activity_names:
121-
raise ValueError("On-demand sandbox activity declaration requires at least one activity name.")
122-
123-
if not worker_profile_id or not worker_profile_id.strip():
124-
raise ValueError("On-demand sandbox activity declaration requires a worker profile ID.")
125-
126-
if max_concurrent_activities <= 0:
127-
raise ValueError("On-demand sandbox activity max concurrent activities must be greater than zero.")
128-
129-
image_ref = _normalize_required(
130-
container_image,
131-
"On-demand sandbox activity image metadata requires a container image reference like "
132-
"'myregistry.azurecr.io/workers/hello:1.0' or "
133-
"'myregistry.azurecr.io/workers/hello@sha256:...'.")
134-
135-
resolved_scheduler_managed_identity_client_id = _normalize_required(
136-
scheduler_managed_identity_client_id,
137-
"On-demand sandbox activity declaration requires the managed identity client ID workers use to connect to Durable Task Scheduler.")
138-
resolved_image_pull_managed_identity_client_id = _normalize_required(
139-
image_pull_managed_identity_client_id,
140-
"On-demand sandbox activity declaration requires the managed identity client ID ADC uses to pull the worker image.")
141-
142-
resolved_cpu = _normalize_cpu(cpu)
143-
resolved_memory = _normalize_memory(memory)
144-
145-
declaration = pb.OnDemandSandboxActivityDeclaration(
146-
worker_profile_id=worker_profile_id.strip(),
147-
image=pb.OnDemandSandboxActivityImage(
148-
image_ref=image_ref,
149-
managed_identity_client_id=resolved_image_pull_managed_identity_client_id),
150-
resources=pb.OnDemandSandboxActivityResources(
151-
cpu=resolved_cpu,
152-
memory=resolved_memory),
153-
scheduler_managed_identity_client_id=resolved_scheduler_managed_identity_client_id,
154-
max_concurrent_activities=max_concurrent_activities)
155-
declaration.activity_names.extend(resolved_activity_names)
156-
declaration.environment_variables.update(environment_variables or {})
157-
declaration.entrypoint.extend(_normalize_optional_strings(entrypoint or []))
158-
declaration.cmd.extend(_normalize_optional_strings(cmd or []))
159-
return declaration
160-
161-
162-
def build_profile_on_demand_sandbox_activity_declarations() -> list[pb.OnDemandSandboxActivityDeclaration]:
163-
"""Build on-demand sandbox declarations from worker profile configuration."""
164-
declarations: list[pb.OnDemandSandboxActivityDeclaration] = []
165-
activity_owners: dict[str, str] = {}
166-
for profile in _worker_profiles.values():
167-
activity_names = resolve_activity_names(profile.activity_names)
168-
if not activity_names:
169-
continue
170-
171-
for activity_name in activity_names:
172-
existing_profile = activity_owners.get(activity_name)
173-
if existing_profile and existing_profile != profile.worker_profile_id:
174-
raise ValueError(
175-
f"On-demand sandbox activity '{activity_name}' is assigned to both worker profile "
176-
f"'{existing_profile}' and '{profile.worker_profile_id}'.")
177-
activity_owners[activity_name] = profile.worker_profile_id
178-
179-
declarations.append(build_on_demand_sandbox_activity_declaration(
180-
activity_names=activity_names,
181-
worker_profile_id=profile.worker_profile_id,
182-
container_image=profile.container_image,
183-
image_pull_managed_identity_client_id=profile.image_pull_managed_identity_client_id,
184-
scheduler_managed_identity_client_id=profile.scheduler_managed_identity_client_id,
185-
cpu=profile.cpu,
186-
memory=profile.memory,
187-
environment_variables=profile.environment_variables,
188-
max_concurrent_activities=profile.max_concurrent_activities,
189-
entrypoint=profile.entrypoint,
190-
cmd=profile.cmd))
191-
192-
return declarations
193-
194-
195-
def build_on_demand_sandbox_worker_start(
196-
*,
197-
taskhub: str,
198-
worker_profile_id: str,
199-
max_activities_count: int,
200-
activity_names: Iterable[str],
201-
substrate: Optional[str] = None,
202-
dts_sandbox_identifier: Optional[str] = None) -> pb.OnDemandSandboxActivityWorkerMessage:
203-
if not taskhub or not taskhub.strip():
204-
raise ValueError("On-demand sandbox activity worker registration requires a task hub name.")
205-
206-
if not worker_profile_id or not worker_profile_id.strip():
207-
raise ValueError("On-demand sandbox activity worker registration requires a worker profile ID.")
208-
209-
if max_activities_count <= 0:
210-
raise ValueError("On-demand sandbox activity worker max activity count must be greater than zero.")
211-
212-
resolved_activity_names = resolve_activity_names(activity_names)
213-
if not resolved_activity_names:
214-
raise ValueError("On-demand sandbox activity worker registration requires at least one registered activity.")
215-
216-
message = pb.OnDemandSandboxActivityWorkerMessage(
217-
start=pb.OnDemandSandboxActivityWorkerStart(
218-
task_hub=taskhub.strip(),
219-
worker_profile_id=worker_profile_id.strip(),
220-
max_activities_count=max_activities_count,
221-
substrate=_parse_substrate(substrate),
222-
dts_sandbox_identifier=(dts_sandbox_identifier or "").strip()))
223-
message.start.activity_names.extend(resolved_activity_names)
224-
return message
225-
226-
227-
def build_on_demand_sandbox_worker_heartbeat(active_activities_count: int) -> pb.OnDemandSandboxActivityWorkerMessage:
228-
if active_activities_count < 0:
229-
raise ValueError("On-demand sandbox activity worker active activity count cannot be negative.")
230-
231-
return pb.OnDemandSandboxActivityWorkerMessage(
232-
heartbeat=pb.OnDemandSandboxActivityWorkerHeartbeat(
233-
active_activities_count=active_activities_count))
234-
235-
236-
class _OnDemandSandboxActivitiesGrpcClient:
237-
"""Internal gRPC client for on-demand sandbox activity RPCs."""
238-
239-
def __init__(
240-
self, *,
241-
host_address: str,
242-
taskhub: str,
243-
token_credential: Optional[TokenCredential],
244-
channel: Optional[grpc.Channel] = None,
245-
secure_channel: bool = True,
246-
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
247-
channel_options: Optional[GrpcChannelOptions] = None):
248-
if not taskhub:
249-
raise ValueError("Taskhub value cannot be empty. Please provide a value for your taskhub")
250-
251-
self._owns_channel = channel is None
252-
if channel is None:
253-
resolved_interceptors: list[shared.ClientInterceptor] = (
254-
list(interceptors) if interceptors is not None else []
255-
)
256-
resolved_interceptors.append(DTSDefaultClientInterceptorImpl(token_credential, taskhub))
257-
channel = shared.get_grpc_channel(
258-
host_address=host_address,
259-
secure_channel=secure_channel,
260-
interceptors=resolved_interceptors,
261-
channel_options=channel_options)
262-
self._channel = channel
263-
self._stub = stubs.OnDemandSandboxActivitiesStub(channel)
264-
265-
def close(self) -> None:
266-
if self._owns_channel:
267-
self._channel.close()
268-
269-
def declare_on_demand_sandbox_activities(
270-
self,
271-
declaration: pb.OnDemandSandboxActivityDeclaration) -> pb.OnDemandSandboxActivityDeclarationResult:
272-
return self._stub.DeclareOnDemandSandboxActivities(declaration)
273-
274-
def remove_on_demand_sandbox_activity_declaration(
275-
self,
276-
worker_profile_id: str) -> pb.RemoveOnDemandSandboxActivityDeclarationResult:
277-
return self._stub.RemoveOnDemandSandboxActivityDeclaration(
278-
pb.RemoveOnDemandSandboxActivityDeclarationRequest(worker_profile_id=worker_profile_id))
279-
280-
def connect_on_demand_sandbox_activity_worker(
281-
self,
282-
messages: Iterable[pb.OnDemandSandboxActivityWorkerMessage]
283-
) -> pb.OnDemandSandboxActivityWorkerSessionResult:
284-
return self._stub.ConnectOnDemandSandboxActivityWorker(messages)
285-
286-
28719
class OnDemandSandboxActivitiesClient:
28820
"""Client for Durable Task Scheduler on-demand sandbox activity management operations."""
28921

@@ -296,7 +28,7 @@ def __init__(
29628
secure_channel: bool = True,
29729
interceptors: Optional[Sequence[shared.ClientInterceptor]] = None,
29830
channel_options: Optional[GrpcChannelOptions] = None):
299-
self._grpc_client = _OnDemandSandboxActivitiesGrpcClient(
31+
self._transport = OnDemandSandboxActivitiesGrpcTransport(
30032
host_address=host_address,
30133
taskhub=taskhub,
30234
token_credential=token_credential,
@@ -306,7 +38,7 @@ def __init__(
30638
channel_options=channel_options)
30739

30840
def close(self) -> None:
309-
self._grpc_client.close()
41+
self._transport.close()
31042

31143
def enable_on_demand_sandbox_activities(self) -> None:
31244
"""Declare all configured on-demand sandbox worker profiles with Durable Task Scheduler."""
@@ -315,68 +47,14 @@ def enable_on_demand_sandbox_activities(self) -> None:
31547
raise ValueError("No configured on-demand sandbox activities were found.")
31648

31749
for declaration in declarations:
318-
self._grpc_client.declare_on_demand_sandbox_activities(declaration)
50+
self._transport.declare_on_demand_sandbox_activities(declaration)
31951

32052
def remove_on_demand_sandbox_activity_declaration(self, worker_profile_id: str) -> None:
32153
worker_profile_id = _normalize_required(worker_profile_id, "Worker profile ID is required.")
322-
self._grpc_client.remove_on_demand_sandbox_activity_declaration(worker_profile_id)
323-
324-
325-
def _normalize_optional_strings(values: Iterable[str]) -> list[str]:
326-
return [value.strip() for value in values if value and value.strip()]
54+
self._transport.remove_on_demand_sandbox_activity_declaration(worker_profile_id)
32755

32856

32957
def _normalize_required(value: Optional[str], message: str) -> str:
33058
if not value or not value.strip():
33159
raise ValueError(message)
33260
return value.strip()
333-
334-
335-
def _normalize_cpu(value: str) -> str:
336-
normalized = _normalize_required(value, "On-demand sandbox activity declaration requires CPU resources.")
337-
milli_cpu = _try_parse_cpu_millicores(normalized)
338-
if milli_cpu is None or milli_cpu <= 0:
339-
raise ValueError(
340-
"On-demand sandbox activity CPU resources must be a positive Kubernetes-style CPU quantity. "
341-
"Use formats like '500m', '2', or '0.5'.")
342-
return normalized
343-
344-
345-
def _normalize_memory(value: str) -> str:
346-
normalized = _normalize_required(value, "On-demand sandbox activity declaration requires memory resources.")
347-
memory_mib = _try_parse_memory_mib(normalized)
348-
if memory_mib is None or memory_mib <= 0:
349-
raise ValueError(
350-
"On-demand sandbox activity memory resources must be a positive Kubernetes-style memory quantity. "
351-
"Use formats like '256Mi', '1Gi', or '2048'.")
352-
return normalized
353-
354-
355-
def _try_parse_cpu_millicores(value: str) -> Optional[int]:
356-
try:
357-
if value[-1:].lower() == "m":
358-
return int(Decimal(value[:-1]))
359-
return int(Decimal(value) * 1000)
360-
except (InvalidOperation, ValueError):
361-
return None
362-
363-
364-
def _try_parse_memory_mib(value: str) -> Optional[int]:
365-
try:
366-
if value[-2:].lower() == "gi":
367-
return int(Decimal(value[:-2]) * 1024)
368-
if value[-2:].lower() == "mi":
369-
return int(Decimal(value[:-2]))
370-
return int(Decimal(value))
371-
except (InvalidOperation, ValueError):
372-
return None
373-
374-
375-
def _parse_substrate(substrate: Optional[str]) -> "pb.SubstrateKind":
376-
if not substrate:
377-
return pb.SUBSTRATE_KIND_UNSPECIFIED
378-
if substrate.lower() == "sandbox":
379-
return pb.SUBSTRATE_KIND_SANDBOX
380-
if substrate.lower() == "acasessionpool":
381-
return pb.SUBSTRATE_KIND_ACA_SESSION_POOL
382-
return pb.SUBSTRATE_KIND_UNSPECIFIED

0 commit comments

Comments
 (0)