Skip to content

Caching

Mycel provides two flow-level caching mechanisms: cache for avoiding repeated reads by storing results, and dedupe (since v2.1.0) for dropping no-op writes whose persisted projection is byte-identical to the last one processed for the same key.

Cache Setup

First, define a cache connector:

# Redis (recommended for production and multi-instance)
connector "redis_cache" {
  type    = "cache"
  driver  = "redis"
  url     = env("REDIS_URL", "redis://localhost:6379")
  prefix  = "myapp:"
}

# In-memory (development or single-instance)
connector "local_cache" {
  type      = "cache"
  driver    = "memory"
  max_items = 10000
  eviction  = "lru"
}

Inline Cache Block

Add a cache block directly in a flow:

flow "get_product" {
  from {
    connector = "api"
    operation = "GET /products/:id"
  }

  cache {
    storage = "redis_cache"
    ttl     = "5m"
    key     = "'product:' + input.params.id"
  }

  to {
    connector = "db"
    target    = "products WHERE id = :id"
  }
}

When a request comes in: 1. Mycel computes the cache key 2. If the key exists in the cache, return the cached value immediately (no to block executes) 3. If not, execute the to block and store the result in the cache

Cache Attributes

Attribute Type Required Description
storage string yes Cache connector name
ttl string no Time-to-live: "5m", "1h", "24h"
key string no CEL expression for cache key (default: auto-generated from request)
invalidate_on list no Event patterns that invalidate this cache entry

Cache Key Expressions

The cache key must uniquely identify the request:

# Simple ID-based key
key = "'product:' + input.params.id"

# Multiple parameters
key = "'users:' + input.params.id + ':orders:' + input.query.status"

# Context-aware (per-user cache)
key = "'user_data:' + ctx.user_id"

Named Caches

Define a named cache block to reuse cache configuration across multiple flows:

cache "products" {
  storage       = "redis_cache"
  ttl           = "10m"
  prefix        = "products"
  invalidate_on = ["product.updated", "product.deleted"]
}

Reference it in a flow:

flow "get_product" {
  from {
    connector = "api"
    operation = "GET /products/:id"
  }
  cache = cache.products
  to {
    connector = "db"
    target    = "products WHERE id = :id"
  }
}

Named Cache Attributes

Attribute Type Description
storage string Cache connector name
ttl string Default TTL for entries
prefix string Key prefix for namespacing
invalidate_on list Event patterns that trigger invalidation

Cache Invalidation

invalidate_on (automatic)

Invalidate cache entries when specific events happen. This uses event pattern matching:

flow "get_user" {
  from {
    connector = "api"
    operation = "GET /users/:id"
  }

  cache {
    storage       = "redis_cache"
    ttl           = "15m"
    key           = "'user:' + input.params.id"
    invalidate_on = ["user.updated:${input.params.id}", "user.deleted:${input.params.id}"]
  }

  to {
    connector = "db"
    target    = "users WHERE id = :id"
  }
}

after block (explicit, per-mutation flow)

Explicitly invalidate keys after a write operation:

flow "update_product" {
  from {
    connector = "api"
    operation = "PUT /products/:id"
  }
  to {
    connector = "db"
    target    = "UPDATE products"
  }

  after {
    invalidate {
      storage  = "redis_cache"
      keys     = ["product:${input.params.id}"]
      patterns = ["products:list:*"]
    }
  }
}

keys invalidates exact keys. patterns invalidates all matching keys (glob-style).

Deduplication

Since v2.1.0 the dedupe block is content-based and runs in two phases. Phase A (after transform, before to) computes a canonical fingerprint over the projection the operator declares and compares it byte-for-byte to the stored fingerprint for the same key; on match the message is dropped according to on_duplicate without invoking to. Phase B (after to succeeds) stores the new fingerprint, so a failed-then-retried message will not self-discard.

The primitive self-locks per key (in-process via the memory-backed SyncManager) so two workers cannot both pass Phase A with identical fingerprints and double-call the downstream. For cross-process serialization across multiple Mycel pods, compose with an outer lock {} block on the same resource key.

The typical use case is an MQ consumer where the upstream re-sends "update" messages even when nothing relevant changed: every redelivery hits a slow downstream and the queue accumulates. With dedupe, only messages whose persisted projection actually differs reach the downstream.

connector "fp_cache" {
  type   = "cache"
  driver = "redis"   # or "memory" for tests / single-pod
}

flow "process_payment" {
  from {
    connector = "rabbit"
    operation = "payments"
  }

  transform {
    payment_id = "input.payment_id"
    account_id = "input.account_id"
    amount     = "input.amount"
  }

  dedupe {
    cache        = "fp_cache"
    key          = "'payment:' + input.payment_id"
    ttl          = "24h"
    on_duplicate = "ack"
    fingerprint {
      payment_id = "output.payment_id"
      account_id = "output.account_id"
      amount     = "output.amount"
    }
  }

  to {
    connector = "db"
    target    = "payments"
  }
}

Dedupe Attributes

Attribute Type Required Default Description
cache string yes Name of a connector { type = "cache" }. The connector pool is initialized once at startup; the hot path does not pay a registry lookup per message
key string yes CEL expression for the per-resource fingerprint key (evaluated against input.*)
fingerprint {} block yes Named CEL expressions whose values form the projection. Both input.* and output.* (transform result) are in scope. Must list every persisted field — omitting one would silently drop real changes
ttl string no How long to keep stored fingerprints. Supports "30d" and "2w" plus stdlib units (s/m/h); malformed values fail the parse
on_duplicate string no "ack" Behavior on fingerprint match: "ack", "reject", "requeue". Matches the sequence_guard vocabulary so MQ consumers handle it uniformly

Pipeline order

The dedupe block runs after transform. The fingerprint expressions reference output.* (the transformed payload), so transform must run first. Earlier versions (≤ 2.0.0) had a key-based dedupe block that ran before transform; see CHANGELOG v2.1.0 for migration.

Array order-insensitivity

The canonical encoder sorts array elements before serialization, treating them as order-insensitive sets. This is appropriate for projections like "list of attribute values" or "set of website flags," but lossy for fields where order is semantically meaningful (e.g. a ranked list where position encodes priority).

For order-sensitive arrays, reshape them in transform before dedupe sees them — join with a delimiter into a single string:

transform {
  # Bad: ranked_tags as an array would lose order in the fingerprint.
  # Good: join into a string so order is part of the encoded value.
  ranked_tags = "input.ranked_tags.map(t, t).join(',')"
}

Caching vs Deduplication

Cache Dedupe
Purpose Avoid redundant downstream reads Drop no-op writes
Applies to Read flows Write flows (especially MQ consumers)
Cache miss Execute to, cache result Process normally; store fingerprint after to success
Cache hit Return cached value immediately Drop without invoking to
Compares Key only Canonical content fingerprint
Pipeline position Before to (read path) After transform, before to

Production Considerations

  • Use Redis for multi-instance deployments. In-memory cache is not shared across instances.
  • Set TTLs appropriate to your data freshness requirements. Stale cache is worse than no cache for critical data.
  • Use invalidate_on or the after block to invalidate caches on writes.
  • Monitor cache hit rates with the /metrics endpoint (Prometheus).
  • Use prefix or key expressions to prevent key collisions between services sharing a Redis instance.

Example: Read-Through Cache for Product Catalog

connector "redis_cache" {
  type    = "cache"
  driver  = "redis"
  url     = env("REDIS_URL", "redis://localhost:6379")
  prefix  = "catalog:"
}

# Cache product reads for 10 minutes
flow "get_product" {
  from {
    connector = "api"
    operation = "GET /products/:id"
  }

  cache {
    storage = "redis_cache"
    ttl     = "10m"
    key     = "'product:' + input.params.id"
  }

  to {
    connector = "db"
    target    = "products WHERE id = :id"
  }
}

# Invalidate on update
flow "update_product" {
  from {
    connector = "api"
    operation = "PUT /products/:id"
  }
  to {
    connector = "db"
    target    = "UPDATE products"
  }

  after {
    invalidate {
      storage = "redis_cache"
      keys    = ["product:${input.params.id}"]
    }
  }
}

# Deduplicate no-op inventory updates by content
flow "handle_inventory_update" {
  from {
    connector = "rabbit"
    operation = "inventory.updated"
  }

  transform {
    product_id  = "input.product_id"
    stock_qty   = "input.stock_qty"
    reorder_at  = "input.reorder_at"
  }

  dedupe {
    cache        = "redis_cache"
    key          = "'inv_fp:' + input.product_id"
    ttl          = "1h"
    on_duplicate = "ack"
    fingerprint {
      product_id = "output.product_id"
      stock_qty  = "output.stock_qty"
      reorder_at = "output.reorder_at"
    }
  }

  to {
    connector = "db"
    target    = "UPDATE products"
  }
}

See Also