Source Properties by Connector
Reference for all properties available in the from block when reading from each connector type. Every from block shares a set of universal attributes; the sections below document what operation means for each connector and what input.* variables are available in transforms.
Universal Attributes
Available on every from block regardless of connector type:
| Attribute |
Type |
Required |
Description |
connector |
string |
yes |
Name of the source connector |
operation |
string |
yes |
Event type or endpoint (meaning varies per connector — see below) |
format |
string |
no |
Input format: json, xml, csv (default: json) |
filter |
string/block |
no |
CEL condition to skip non-matching events |
from {
connector = "rabbit"
operation = "payments"
filter {
condition = "input.amount > 0"
on_reject = "requeue" # "ack" (discard), "reject" (DLQ), "requeue" (retry)
id_field = "input.payment_id"
max_requeue = 3
}
}
Schedule trigger (when)
Any flow can add a when block (cron) instead of an event-based from:
flow "nightly_sync" {
when {
schedule = "0 0 * * *" # Standard cron expression
timezone = "UTC"
}
to { ... }
}
REST Server
| Property |
Value |
| Connector type |
rest |
operation format |
"METHOD /path" — e.g., "GET /users", "POST /orders", "GET /users/:id" |
Path parameters use colon syntax (:id, :user_id).
| Variable |
Source |
Description |
input.<param> |
Path |
Path parameters by name (input.id, input.user_id) |
input.<param> |
Query |
Query string parameters by name (input.page, input.limit) |
input.<field> |
Body |
JSON/XML body fields merged directly (POST/PUT/PATCH) |
input.headers |
Headers |
Map of all request headers (lowercased keys) |
input.<field> |
Multipart |
File uploads: {filename, size, content_type, data} (base64) |
from {
connector = "api"
operation = "POST /users/:id/upload"
}
# Available: input.id (path), input.name (body), input.headers (map), input.avatar (file)
GraphQL Server
| Property |
Value |
| Connector type |
graphql |
operation format |
"Query.fieldName", "Mutation.fieldName", "Subscription.fieldName" |
| Variable |
Source |
Description |
input.<arg> |
Arguments |
GraphQL arguments passed to the field resolver |
from {
connector = "gql"
operation = "Mutation.createUser"
}
# Available: input.name, input.email (from mutation arguments)
gRPC Server
| Property |
Value |
| Connector type |
grpc |
operation format |
"Service/Method" or "package.Service/Method" |
| Variable |
Source |
Description |
input.<field> |
Proto message |
All protobuf message fields (decoded via JSON) |
from {
connector = "grpc_server"
operation = "UserService/CreateUser"
}
# Available: input.name, input.email (from proto request message)
SOAP Server
| Property |
Value |
| Connector type |
soap (with driver = "server") |
operation format |
SOAP operation name — e.g., "CreateOrder", "GetUser" |
Extracted from the SOAP envelope body element name.
| Variable |
Source |
Description |
input.<field> |
SOAP body |
Parameters parsed from the SOAP envelope body |
from {
connector = "soap_server"
operation = "CreateOrder"
}
# Available: input.customer_id, input.items (from SOAP body elements)
TCP Server
| Property |
Value |
| Connector type |
tcp |
operation format |
Message type string (json/msgpack) or NestJS pattern string |
| Variable |
Source |
Description |
input.<field> |
Message data |
All fields from msg.Data merged directly |
from {
connector = "tcp_server"
operation = "create_order"
}
# Available: input.product_id, input.quantity (from message data)
RabbitMQ
| Property |
Value |
| Connector type |
queue (with driver = "rabbitmq") |
operation format |
Routing key — e.g., "orders.created", "user.*", "#" |
Supports AMQP topic exchange patterns: * matches one word, # matches zero or more.
| Variable |
Source |
Description |
input.body |
Payload |
Parsed JSON (or raw string) |
input.headers |
AMQP |
AMQP headers as map |
input.properties |
AMQP |
Message properties (see below) |
input.routing_key |
AMQP |
The routing key |
input.exchange |
AMQP |
The exchange name |
input.properties fields: message_id, correlation_id, content_type, content_encoding, delivery_mode, priority, reply_to, expiration, type, user_id, app_id, timestamp, delivery_tag, redelivered.
from {
connector = "rabbit"
operation = "orders.created"
}
# Available: input.body.order_id, input.routing_key, input.properties.correlation_id
Kafka
| Property |
Value |
| Connector type |
queue (with driver = "kafka") |
operation format |
Topic name — e.g., "orders", "user-events" |
| Variable |
Source |
Description |
input.body |
Payload |
Parsed JSON (or raw string) |
input.headers |
Kafka |
Kafka headers as map |
input.topic |
Kafka |
Topic name |
input.partition |
Kafka |
Partition number |
input.offset |
Kafka |
Message offset |
input.key |
Kafka |
Message key (string) |
input.timestamp |
Kafka |
Unix timestamp |
from {
connector = "kafka"
operation = "order-events"
}
# Available: input.body.event_type, input.key, input.partition, input.offset
Redis Pub/Sub
| Property |
Value |
| Connector type |
queue (with driver = "redis") |
operation format |
Channel name or glob pattern — e.g., "orders", "user.*", "*" |
Exact channel match first, then pattern match (from PSubscribe), then wildcard "*".
| Variable |
Source |
Description |
input._channel |
Redis |
Channel the message was published to |
input._pattern |
Redis |
Pattern (if matched via PSubscribe), omitted for exact subscriptions |
input.<field> |
Payload |
JSON payload fields merged directly |
input.raw |
Payload |
Raw string payload (if not valid JSON) |
from {
connector = "redis_events"
operation = "orders.*"
}
# Available: input._channel ("orders.created"), input._pattern ("orders.*"), input.order_id
MQTT
| Property |
Value |
| Connector type |
mqtt |
operation format |
MQTT topic pattern — e.g., "sensors/+/temperature", "home/#" |
Supports MQTT wildcards: + matches single level, # matches multi-level.
| Variable |
Source |
Description |
input._topic |
MQTT |
Topic the message was received on |
input._message_id |
MQTT |
MQTT message ID |
input._qos |
MQTT |
QoS level (0, 1, or 2) |
input._retained |
MQTT |
Whether the message was retained |
input.<field> |
Payload |
JSON payload fields merged directly |
input._raw |
Payload |
Raw string payload (if not valid JSON) |
from {
connector = "mqtt_broker"
operation = "sensors/+/temperature"
}
# Available: input._topic ("sensors/room1/temperature"), input._qos, input.value, input.unit
WebSocket
| Property |
Value |
| Connector type |
websocket |
operation format |
Event type: "connect", "disconnect", "message", or custom type string |
| Event |
Variables |
"connect" |
input.event, input.remote_addr |
"disconnect" |
input.event |
"message" |
input.event, data fields merged into input, input.user_id |
| custom type |
input.event, input.data, input.room |
from {
connector = "ws"
operation = "message"
}
# Available: input.event ("message"), input.user_id, input.text (from message data)
SSE (Server-Sent Events)
| Property |
Value |
| Connector type |
sse |
operation format |
"connect" or "disconnect" |
SSE is unidirectional (server-to-client push). The from block only fires on lifecycle events.
| Event |
Variables |
"connect" |
input.event, input.client_id, input.remote_addr |
"disconnect" |
input.event, input.client_id |
from {
connector = "sse"
operation = "connect"
}
# Available: input.event, input.client_id, input.remote_addr
CDC (Change Data Capture)
| Property |
Value |
| Connector type |
cdc |
operation format |
"TRIGGER:table" — e.g., "INSERT:users", "UPDATE:orders", "*:*" |
Trigger is uppercase (INSERT, UPDATE, DELETE). Wildcards: "*:users" (any trigger), "INSERT:*" (any table), "*:*" or "*" (all).
| Variable |
Source |
Description |
input.trigger |
CDC |
"INSERT", "UPDATE", or "DELETE" |
input.table |
CDC |
Table name (lowercase) |
input.schema |
CDC |
Schema name (e.g., "public") |
input.timestamp |
CDC |
RFC3339 timestamp |
input.new |
CDC |
New row data (INSERT/UPDATE) |
input.old |
CDC |
Old row data (UPDATE/DELETE) |
from {
connector = "cdc"
operation = "INSERT:users"
}
# Available: input.trigger, input.table, input.new.email, input.new.id
File Watch
| Property |
Value |
| Connector type |
file (with watch = true) |
operation format |
Glob pattern — e.g., "*.csv", "reports/*.json", "**/*.csv" |
Matches against filename, relative path, or **/ prefix with filename suffix.
| Variable |
Source |
Description |
input._path |
File |
Relative path from base_path |
input._name |
File |
Filename only |
input._size |
File |
File size in bytes |
input._mod_time |
File |
RFC3339 modification time |
input._event |
File |
"created" or "modified" |
input._error |
File |
Error string (if file could not be read) |
input.<field> |
Content |
Single-row file fields merged directly |
input.rows |
Content |
Multi-row file content as array of maps |
from {
connector = "data_files"
operation = "*.csv"
}
# Available: input._path, input._name, input._event, input.rows (array of CSV rows)
Summary
| Connector |
operation format |
Key input.* fields |
| REST |
"METHOD /path" (e.g., "GET /users/:id") |
path params, query params, body fields, headers |
| GraphQL |
"Query.field" / "Mutation.field" / "Subscription.field" |
argument fields |
| gRPC |
"Service/Method" |
proto message fields |
| SOAP |
"OperationName" |
SOAP body element children |
| TCP |
message type/pattern string |
msg.Data fields |
| RabbitMQ |
routing key (* / # wildcards) |
body, headers, properties, routing_key, exchange |
| Kafka |
topic name |
body, headers, topic, partition, offset, key, timestamp |
| Redis Pub/Sub |
channel name or glob pattern |
_channel, _pattern, payload fields |
| MQTT |
topic pattern (+ / # wildcards) |
_topic, _message_id, _qos, _retained, payload fields |
| WebSocket |
"connect" / "disconnect" / "message" / custom type |
event, data fields, user_id, room |
| SSE |
"connect" / "disconnect" |
event, client_id, remote_addr |
| CDC |
"TRIGGER:table" (e.g., "INSERT:users") |
trigger, table, schema, new, old, timestamp |
| File watch |
glob pattern (e.g., "*.csv") |
_path, _name, _size, _mod_time, _event, content fields |
See also: Flows for from block syntax, Destination Properties for to block properties, Configuration Reference for all HCL blocks.