Asya supports pluggable message queue transports for actor communication.
Overview¶
Transport layer is abstracted - sidecar implements transport interface, allowing different queue backends.
Supported Transports¶
Planned Transports¶
- Kafka: High-throughput distributed streaming
- NATS: Cloud-native messaging system
- Google Pub/Sub: GCP-managed messaging service
See KEDA scalers for potential integration targets.
Transport Configuration¶
Transports configured at operator installation time in deploy/helm-charts/asya-operator/values.yaml:
transports:
rabbitmq:
enabled: true
type: rabbitmq
config:
host: rabbitmq.default.svc.cluster.local
port: 5672
username: guest
passwordSecretRef:
name: rabbitmq-secret
key: password
exchange: asya # Optional, defaults to "asya"
queues:
autoCreate: true # Optional, defaults to true
forceRecreate: false # Optional, defaults to false
dlq:
enabled: true # Optional
maxRetryCount: 3 # Optional, defaults to 3
sqs:
enabled: true
type: sqs
config:
region: us-east-1
endpoint: "" # Optional, for LocalStack or custom SQS endpoints
visibilityTimeout: 300 # Optional, seconds, defaults to 300
waitTimeSeconds: 20 # Optional, seconds, defaults to 20
queues:
autoCreate: true # Optional, defaults to true
forceRecreate: false # Optional, defaults to false
dlq:
enabled: true # Optional
maxRetryCount: 3 # Optional, defaults to 3
retentionDays: 14 # Optional, defaults to 14
tags: # Optional, tags for created queues
Environment: production
Team: ml-platform
AsyncActors reference transport by name:
spec:
transport: sqs # or rabbitmq
Transport Interface¶
Sidecar implements (src/asya-sidecar/internal/transport/transport.go):
Receive(ctx, queueName): Receive single message from queue (blocking with long polling)Send(ctx, queueName, body): Send message body to queueAck(ctx, message): Acknowledge successful processingNack(ctx, message): Negative acknowledge (requeue or move to DLQ)
Queue Management¶
Queues automatically created by operator when AsyncActor reconciled.
Queue naming: asya-{actor_name}
Lifecycle:
- Created when AsyncActor created
- Deleted when AsyncActor deleted
- Preserved when AsyncActor updated
Adding New Transport¶
- Implement transport interface in
src/asya-sidecar/internal/transport/ - Add transport configuration to operator
- Add KEDA scaler configuration
- Update documentation
See src/asya-sidecar/internal/transport/ for implementation examples.