Skip to content

Cephalon.Data.MongoDB

Maturity: M2 · Ownership: provider-managed · Family: data-and-cdc · See audit, matrix.

Cephalon.Data.MongoDB is the MongoDB document-store companion pack for Cephalon. It proves that the companion-pack pattern established by Cephalon.Data.EntityFramework extends cleanly to non-relational providers without any changes to Cephalon.Engine or Cephalon.Abstractions, and it now also proves the first concrete provider-native CDC runner on top of the shared Cephalon.Data execution/runtime catalog family.

  • registers a singleton IMongoClient from a connection string and a singleton IMongoDatabase from the configured database name, using TryAdd semantics so a host-owned client is never displaced
  • registers a scoped IOutbox backed by the outbox_messages collection when RegisterOutbox is enabled; the collection name honors the optional CollectionPrefix
  • registers a scoped IEventDispatchStore over that same outbox collection when RegisterOutbox is enabled, so staged MongoDB events can be read and durable dispatch outcomes can be written back through the runtime-neutral eventing contract
  • registers a scoped IInbox backed by the inbox_receipts collection when RegisterInbox is enabled; the collection name honors the optional CollectionPrefix
  • ensures that outbox staging is idempotent by maintaining a unique index on MessageId and swallowing the MongoDB duplicate-key error (code 11000) on a repeated StageAsync call for the same message
  • contributes configured provider-native MongoDB change-stream captures through MongoDbChangeStreamCaptureOptions and keeps those descriptors on the shared /engine/cdc-captures* catalog with provider = "mongodb" and mode = "change-stream"
  • publishes capability metadata data.mongodb, data.document-store, and optionally data.outbox.mongodb, data.inbox.mongodb, and data.cdc.mongodb introspectable at runtime through the manifest
  • projects the outbox descriptor through the event-driven-integration technology surface as outbox-producers with provider: "mongodb" and mode: "document-collection" when that technology is active
  • projects the inbox descriptor through the same technology surface as inbox-stores when that technology is active
  • publishes the provider-native CDC execution graph mongodb-change-stream-capture-flow, hosted execution mongodb-change-stream-capture-pump, and execution runtime mongodb-change-stream-capture-pump when change-stream captures are configured
  • runs a provider-native hosted service that watches configured collections, stages outbox publications, persists resume-token checkpoints only after stage success, and reports runtime posture through the shared ICdcCaptureRuntimeReporter surface
  • stores durable change-stream resume tokens in the {CollectionPrefix}cdc_change_stream_checkpoints collection
  • preserves authored capture ownership through CdcCaptureDescriptor.SourceModuleId while surfacing metadata.contributorModuleId = "mongodb-data" when the provider pack contributes the descriptor on behalf of another module
  • Configuration/MongoDbDataOptions.cs
  • Configuration/MongoDbChangeStreamCaptureOptions.cs
  • Modules/MongoDbDataModule.cs
  • Registration/MongoDbDataEngineBuilderExtensions.cs
  • Services/MongoDbChangeStreamCaptureHostedService.cs
  • Services/MongoDbChangeStreamCheckpointEntry.cs
  • Services/MongoDbChangeStreamExecutionRuntimeContributor.cs
  • Services/MongoDbDataRuntimeIds.cs
  • Services/MongoDbOutboxEntry.cs
  • Services/MongoDbOutbox.cs
  • Services/MongoDbEventDispatchStore.cs
  • Services/MongoDbOutboxRuntimeSurfaceContributor.cs
  • Services/MongoDbInboxEntry.cs
  • Services/MongoDbInbox.cs
  • Services/MongoDbInboxRuntimeSurfaceContributor.cs

This pack sits on top of Cephalon.Data, not in place of it. Cephalon.Data still owns the runtime-neutral IReadStore / IWriteStore dispatching surface, the shared CDC runtime-state catalog, capture-side execution binding, and the additive execution-runtime contract. Cephalon.Data.MongoDB adds MongoDB-backed outbox and inbox persistence so event-driven workloads can stage and track messages without switching to a relational store, and it also exposes the same staged outbox through IEventDispatchStore so consumer-managed or adapter-owned dispatch loops can read pending items and persist durable dispatch outcomes without MongoDB-specific host glue.

The same pack now also proves the first provider-native CDC runner on top of those shared contracts. Instead of inventing a MongoDB-only registry, the pack contributes normal CdcCaptureDescriptor entries, binds them to the provider-native execution runtime mongodb-change-stream-capture-pump, stages captured changes through the linked IOutbox, persists resume-token checkpoints only after stage success, and reports freshness, lag, publication posture, checkpoints, and failures back through the shared /engine/cdc-captures/runtime* surface. The shared data-cdc-capture-pump remains additive and simply ignores captures whose effective owner resolves to the MongoDB runtime.

The slice stays intentionally honest: it proves the companion-pack pattern for document-oriented stores, ships an idempotent outbox and inbox, and now also ships one collection-scoped provider-native CDC path. IReadStore and IWriteStore are still not backed directly by MongoDB in this slice, and broader out-of-process CDC reporting or higher-level multi-capture orchestration still remain later work.

engine.AddMongoDbData(
connectionString: "mongodb://localhost:27017",
databaseName: "myapp");

To enable the outbox and inbox paths:

engine.AddMongoDbData(
connectionString: "mongodb://localhost:27017",
databaseName: "myapp",
configure: options =>
{
options.RegisterOutbox = true;
options.RegisterInbox = true;
options.CollectionPrefix = "app_"; // optional - prefix all Cephalon collections
});

To enable the provider-native MongoDB CDC path, pair the pack with AddData() so the shared CDC catalogs and reporters exist:

engine.AddData();
engine.AddMongoDbData(options =>
{
configuration.GetSection(MongoDbDataOptions.SectionPath).Bind(options);
options.ConnectionStringName ??= "MongoDB";
options.DatabaseName = "myapp";
options.RegisterOutbox = true;
options.CollectionPrefix = "app_";
options.ChangeStreamCaptures.Add(new MongoDbChangeStreamCaptureOptions
{
Id = "catalog-items-cdc",
DisplayName = "Catalog Items CDC",
SourceModuleId = "catalog",
CollectionName = "catalog_items",
OutboxId = "mongodb-outbox",
ChannelId = "catalog.items",
MessageType = "CatalogItemChanged"
});
});

If the same host also needs the shared Cephalon.Data in-process CDC pump for other non-MongoDB captures, enable it separately through AddData(options => options.EnableCdcExecution = true). The shared pump still skips any capture whose effective owner is mongodb-change-stream-capture-pump.

For configuration-driven hosts, prefer the options overload and let the pack resolve either ConnectionStringName from the root ConnectionStrings section or ConnectionString directly:

engine.AddData();
engine.AddMongoDbData(options =>
{
configuration.GetSection(MongoDbDataOptions.SectionPath).Bind(options);
options.ConnectionStringName ??= "MongoDB";
options.DatabaseName = "myapp";
});
{
"ConnectionStrings": {
"MongoDB": "mongodb://localhost:27017/?replicaSet=rs0"
},
"Engine": {
"Data": {
"MongoDB": {
"ConnectionStringName": "MongoDB",
"DatabaseName": "myapp",
"CollectionPrefix": "app_",
"RegisterOutbox": true,
"ChangeStreamCaptures": [
{
"Id": "catalog-items-cdc",
"DisplayName": "Catalog Items CDC",
"SourceModuleId": "catalog",
"CollectionName": "catalog_items",
"OutboxId": "mongodb-outbox",
"ChannelId": "catalog.items",
"MessageType": "CatalogItemChanged",
"FullDocumentMode": "update-lookup"
}
]
}
}
}
}

ConnectionStringName and ConnectionString are mutually exclusive. If both are set, the pack fails fast during service resolution. If neither is set, MongoDB falls back to mongodb://localhost:27017.

Configuration options (Engine:Data:MongoDB)

Section titled “Configuration options (Engine:Data:MongoDB)”
OptionTypeDefaultDescription
ConnectionStringNamestring?nullRoot ConnectionStrings key to resolve for MongoDB
ConnectionStringstring?nullInline MongoDB connection string
DatabaseNamestring"cephalon"Target database name
CollectionPrefixstring""Optional prefix for all Cephalon-managed collections
RegisterOutboxboolfalseRegister IOutbox backed by the outbox_messages collection
RegisterInboxboolfalseRegister IInbox backed by the inbox_receipts collection
ChangeStreamCapturesMongoDbChangeStreamCaptureOptions[][]Contribute provider-native MongoDB change-stream CDC captures

Change-stream capture options (Engine:Data:MongoDB:ChangeStreamCaptures[])

Section titled “Change-stream capture options (Engine:Data:MongoDB:ChangeStreamCaptures[])”
OptionTypeDefaultDescription
IdstringrequiredStable CDC capture id
DisplayNamestringIdOperator-facing capture name
DescriptionstringgeneratedHuman-readable capture description
SourceModuleIdstringrequiredModule id that owns the capture surface
SourceIdstring{provider}:{database}/{collection}Logical upstream source id when it should differ from the watched collection path
DatabaseNamestring?pack-level DatabaseNameDatabase name to watch for this capture
CollectionNamestringrequiredMongoDB collection to watch
OutboxIdstring"mongodb-outbox"Linked outbox id that receives publications
ChannelIdstringrequiredLogical outbox channel id for emitted publications
MessageTypestringrequiredLogical message type for emitted publications
EventFormatstring"mongodb-change-stream-event"Operator-facing event format projected on the descriptor
FullDocumentModestring"update-lookup"MongoDB full-document mode: default, update-lookup, when-available, or required
MaxAwaitTimeSecondsint5Maximum await time for one change-stream batch
BatchSizeint?nullOptional batch-size hint for one change-stream batch
ResourceIdsstring[]["{database}.{collection}"]Explicit logical resources observed by the capture
Tagsstring[]["cdc", "mongodb", "provider-native"]Operator-facing tags on the descriptor
MetadataDictionary<string,string>{}Additional operator-facing metadata merged onto the descriptor

Outbox collection schema (outbox_messages)

Section titled “Outbox collection schema (outbox_messages)”

The collection name is {CollectionPrefix}outbox_messages.

FieldBSON typeNotes
_idObjectIdAuto-generated surrogate key
MessageIdstringUnique idempotency key (GUID); unique index prevents duplicate staging
EventTypestringFully qualified CLR event type name
PayloadstringSystem.Text.Json-serialized event body
CorrelationIdstring?Optional causality tracking, propagated from IBehaviorContext.CorrelationId
TenantIdstring?Optional multi-tenancy discriminator
CreatedAtUtcDateTimeUTC timestamp when the message was staged
DispatchedAtUtcDateTime?null until the message is marked dispatched
DispatchAttemptCountintIncremented on each dispatch attempt; starts at 0
NextAttemptAtUtcDateTime?Populated by a dispatch adapter for delayed-retry intent

Idempotency: A unique index on MessageId is created on first use. Calling StageAsync with a MessageId that already exists silently returns: the duplicate-key exception (error code 11000) is caught and swallowed.

The collection name is {CollectionPrefix}inbox_receipts.

FieldBSON typeNotes
_idObjectIdAuto-generated surrogate key
MessageIdstringProcessed message id; unique index enforces exactly-once semantics
ProcessedAtUtcDateTimeUTC timestamp when the message was first processed

Idempotency: A unique index on MessageId is created on first use. MarkProcessedAsync swallows the duplicate-key error: calling it twice for the same id is safe.

When ChangeStreamCaptures are configured:

  • each capture is published through /engine/cdc-captures* with provider = "mongodb", mode = "change-stream", and an executionBinding whose authored and requested runtime id is mongodb-change-stream-capture-pump
  • the execution runtime is published through /engine/cdc-capture-runtimes* and snapshot.CdcCaptureExecutionRuntimes with executionOwnership = host-managed, executionTopology = provider-native, and acknowledgementMode = provider-native
  • the same runtime publishes through /engine/execution-graphs, /engine/hosted-executions, /engine/runtime-story, and snapshot under mongodb-change-stream-capture-flow plus mongodb-change-stream-capture-pump
  • the hosted runner watches one collection per configured capture, serializes the raw MongoDB BackingDocument as relaxed Extended JSON, stages one outbox message per change, and only persists the latest resume token after the linked outbox accepted the batch
  • each staged outbox message uses deterministic id {cdcCaptureId}:{resumeTokenHash}, content type application/vnd.cephalon.mongodb.change-stream+json, headers for provider, cdcCaptureId, databaseName, collectionName, and operationType, plus metadata for sourceId, eventFormat, and resumeTokenHash
  • the runtime-state surface keeps typed freshness, current lag, pending-publication posture, checkpoint, change id, last operation type, checkpoint collection, and failure-kind metadata on the same /engine/cdc-captures/runtime* catalog instead of inventing a MongoDB-only monitor
  • if a provider pack contributes a descriptor on behalf of another module, the authored SourceModuleId remains authoritative and metadata.contributorModuleId keeps the contributing pack explicit

Checkpoint collection schema (cdc_change_stream_checkpoints)

Section titled “Checkpoint collection schema (cdc_change_stream_checkpoints)”

The collection name is {CollectionPrefix}cdc_change_stream_checkpoints.

FieldBSON typeNotes
_idObjectIdAuto-generated surrogate key
CdcCaptureIdstringStable CDC capture id; one checkpoint record per capture
ResumeTokenJsonstringRelaxed Extended JSON copy of the latest durable resume token
ChangeIdstringSHA-256 hash derived from the persisted resume token
UpdatedAtUtcDateTimeUTC timestamp of the last durable checkpoint write

When MongoDbDataModule is active, the following capability keys appear in the runtime manifest:

Capability keyWhen registered
data.mongodbAlways
data.document-storeAlways
data.outbox.mongodbRegisterOutbox = true
data.inbox.mongodbRegisterInbox = true
data.cdc.mongodbChangeStreamCaptures.Count > 0

When the event-driven-integration technology is active, the following entries appear under /engine/snapshot:

SurfaceEntry idprovider metadata
outbox-producersmongodb-outboxmongodb
inbox-storesmongodb-inboxmongodb

When provider-native change-stream captures are configured, the following additional runtime entries appear:

SurfaceEntry idNotes
/engine/cdc-capture-runtimesmongodb-change-stream-capture-pumpProvider-native execution-runtime descriptor and aggregate summary
/engine/hosted-executionsmongodb-change-stream-capture-pumpBackground-service execution entry
/engine/execution-graphsmongodb-change-stream-capture-flowProvider-native change-stream execution flow

This pack intentionally still does not claim:

  • IReadStore / IWriteStore dispatch backed by MongoDB: query and command handlers should use IMongoDatabase directly
  • transaction-scoped outbox staging (no MongoDB multi-document transaction is opened; each StageAsync is a single InsertOneAsync)
  • pack-owned dispatch loops or broker-specific retry scheduling beyond the runtime-neutral IEventDispatchStore bridge
  • projection rebuild orchestration
  • out-of-process or externally reported MongoDB change-stream runtime observation beyond the in-process hosted service
  • broader multi-collection orchestration or low-code capture generation beyond the current one-capture-per-configured-collection path

These remain explicit later slices so the current provider claim stays truthful.