Connectors¶
A connector is a bidirectional adapter between Mycel and an external system. Every connector can act as a source (receives data that triggers a flow) or a target (destination where a flow writes data). Some are naturally one-directional — email is output-only, cron is input-only — but most work both ways.
Connector Table¶
| Type | Driver / Examples | As Source | As Target |
|---|---|---|---|
rest |
HTTP server | Expose endpoints | — |
http |
HTTP client | — | Call APIs |
database |
postgres, mysql, sqlite, mongodb |
Query data | Insert/Update/Delete |
graphql |
GraphQL server/client | Expose schema | Query/Mutate |
queue |
rabbitmq, kafka, redis |
Consume messages | Publish messages |
grpc |
gRPC server/client | Expose services | Call services |
tcp |
TCP server/client | Receive connections | Send data |
cache |
memory, redis |
— | Read/write cache |
file |
Local filesystem | Watch for files | Write files |
s3 |
AWS S3, MinIO | Read objects | Write objects |
websocket |
WebSocket server | — | Push to clients |
sse |
Server-Sent Events | — | Push events |
cdc |
PostgreSQL WAL | Stream DB changes | — |
exec |
Shell commands | — | Execute commands |
email |
SMTP | — | Send emails |
slack |
Slack API | — | Send messages |
discord |
Discord API | — | Send messages |
sms |
Twilio | — | Send SMS |
push |
FCM, APNs | — | Push notifications |
webhook |
HTTP callbacks | — | Send webhooks |
soap |
SOAP 1.1/1.2 | Expose SOAP endpoints | Call SOAP services |
elasticsearch |
Elasticsearch | — | Index/Search |
oauth |
Google, GitHub, Apple, OIDC | OAuth callback | — |
mqtt |
MQTT 3.1.1/5.0 | Subscribe to topics | Publish messages |
ftp |
FTP, FTPS, SFTP | List/Download files | Upload/Delete files |
Defining a Connector¶
The connector name is how flows reference it: connector = "NAME" in a flow's from or to block.
Common Connectors¶
REST Server¶
connector "api" {
type = "rest"
port = 3000
cors {
origins = ["*"]
methods = ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
headers = ["Content-Type", "Authorization"]
}
}
HTTP Client¶
connector "external_api" {
type = "http"
base_url = "https://api.example.com"
timeout = "30s"
auth {
type = "bearer"
token = env("API_TOKEN")
}
retry {
count = 3
interval = "1s"
backoff = 2.0
}
}
Database¶
# PostgreSQL
connector "db" {
type = "database"
driver = "postgres"
host = env("PG_HOST")
port = 5432
database = env("PG_DATABASE")
user = env("PG_USER")
password = env("PG_PASSWORD")
ssl_mode = "require"
pool {
max = 100
min = 10
max_lifetime = 300
}
}
# MySQL
connector "mysql" {
type = "database"
driver = "mysql"
host = env("MYSQL_HOST")
port = 3306
database = env("MYSQL_DATABASE")
user = env("MYSQL_USER")
password = env("MYSQL_PASSWORD")
}
# SQLite (no server needed)
connector "local_db" {
type = "database"
driver = "sqlite"
database = "./data.db"
}
# MongoDB
connector "mongo" {
type = "database"
driver = "mongodb"
uri = env("MONGO_URI")
database = "myapp"
}
Message Queue¶
# RabbitMQ
connector "rabbit" {
type = "mq"
driver = "rabbitmq"
host = env("RABBITMQ_HOST")
port = 5672
user = "guest"
password = env("RABBITMQ_PASS")
vhost = "/"
}
# Kafka
connector "kafka" {
type = "mq"
driver = "kafka"
brokers = ["kafka:9092"]
}
# Redis Pub/Sub
connector "redis_events" {
type = "mq"
driver = "redis"
url = env("REDIS_URL", "redis://localhost:6379")
channels = ["orders", "payments"]
}
Cache¶
# Redis
connector "cache" {
type = "cache"
driver = "redis"
url = env("REDIS_URL", "redis://localhost:6379")
default_ttl = "1h"
prefix = "myapp:"
}
# In-memory (no external service)
connector "local_cache" {
type = "cache"
driver = "memory"
max_items = 10000
eviction = "lru"
}
GraphQL¶
# Server
connector "gql" {
type = "graphql"
driver = "server"
port = 4000
endpoint = "/graphql"
playground = true
subscriptions {
enabled = true
transport = "websocket"
path = "/graphql/ws"
}
}
# Client
connector "external_gql" {
type = "graphql"
driver = "client"
endpoint = "https://api.example.com/graphql"
timeout = "30s"
}
gRPC¶
# Server
connector "grpc_api" {
type = "grpc"
driver = "server"
port = 50051
proto_path = "./proto"
proto_files = ["user.proto", "order.proto"]
reflection = true
}
# Client
connector "user_service" {
type = "grpc"
driver = "client"
target = "users-service:50051"
proto_path = "./proto"
proto_files = ["user.proto"]
insecure = false
wait_for_ready = true
}
File System¶
connector "files" {
type = "file"
base_path = "./data"
format = "json"
create_dirs = true
# Enable file watching (triggers flows on new/modified files)
watch = true
watch_interval = "5s"
}
S3¶
connector "storage" {
type = "s3"
bucket = env("S3_BUCKET")
region = env("AWS_REGION")
# For MinIO or custom S3-compatible
endpoint = env("S3_ENDPOINT")
access_key = env("S3_ACCESS_KEY")
secret_key = env("S3_SECRET_KEY")
force_path_style = true
}
Named Operations¶
Named operations define reusable parameterized queries on a connector. Instead of repeating SQL or API call patterns across flows, define them once and reference them by name.
connector "db" {
type = "database"
driver = "postgres"
# ... connection details
operation "find_active_users" {
query = "SELECT * FROM users WHERE status = 'active' AND org_id = $1"
params = [{ name = "org_id", type = "string", required = true }]
}
operation "deactivate_user" {
query = "UPDATE users SET status = 'inactive' WHERE id = $1"
params = [{ name = "id", type = "string", required = true }]
}
}
Then in flows:
flow "list_active_users" {
from {
connector = "api"
operation = "GET /users"
}
to {
connector = "db"
operation = "find_active_users"
}
}
See the named-operations example for complete patterns.
Connector Profiles¶
Profiles allow a single connector to have multiple backends selected at runtime. Useful for multi-tenant systems, A/B testing, or read/write splitting.
connector "db" {
type = "database"
driver = "postgres"
select = "input.tenant_id" # CEL expression to pick profile
default = "primary"
profile "primary" {
host = env("PRIMARY_HOST")
database = "app"
user = env("DB_USER")
password = env("DB_PASSWORD")
}
profile "analytics" {
host = env("ANALYTICS_HOST")
database = "app_analytics"
user = env("DB_USER")
password = env("DB_PASSWORD")
}
}
The select expression is evaluated at flow execution time. Profiles can also use fallback for failover:
connector "cache" {
type = "cache"
driver = "redis"
fallback = ["primary", "secondary"]
profile "primary" {
url = env("REDIS_PRIMARY_URL")
}
profile "secondary" {
url = env("REDIS_SECONDARY_URL")
}
}
See the profiles example for details.
Per-Connector Reference¶
For complete configuration options and examples for each connector type, see the Connector Catalog: