Cephalon.Data.MongoDB
Maturity:
M2· Ownership: provider-managed · Family:data-and-cdc· See audit, matrix.
Cephalon.Data.MongoDB is the MongoDB document-store companion pack for Cephalon. It proves that the companion-pack pattern established by Cephalon.Data.EntityFramework extends cleanly to non-relational providers without any changes to Cephalon.Engine or Cephalon.Abstractions, and it now also proves the first concrete provider-native CDC runner on top of the shared Cephalon.Data execution/runtime catalog family.
What it owns
Section titled “What it owns”- registers a singleton
IMongoClientfrom a connection string and a singletonIMongoDatabasefrom the configured database name, usingTryAddsemantics so a host-owned client is never displaced - registers a scoped
IOutboxbacked by theoutbox_messagescollection whenRegisterOutboxis enabled; the collection name honors the optionalCollectionPrefix - registers a scoped
IEventDispatchStoreover that same outbox collection whenRegisterOutboxis enabled, so staged MongoDB events can be read and durable dispatch outcomes can be written back through the runtime-neutral eventing contract - registers a scoped
IInboxbacked by theinbox_receiptscollection whenRegisterInboxis enabled; the collection name honors the optionalCollectionPrefix - ensures that outbox staging is idempotent by maintaining a unique index on
MessageIdand swallowing the MongoDB duplicate-key error (code 11000) on a repeatedStageAsynccall for the same message - contributes configured provider-native MongoDB change-stream captures through
MongoDbChangeStreamCaptureOptionsand keeps those descriptors on the shared/engine/cdc-captures*catalog withprovider = "mongodb"andmode = "change-stream" - publishes capability metadata
data.mongodb,data.document-store, and optionallydata.outbox.mongodb,data.inbox.mongodb, anddata.cdc.mongodbintrospectable at runtime through the manifest - projects the outbox descriptor through the
event-driven-integrationtechnology surface asoutbox-producerswithprovider: "mongodb"andmode: "document-collection"when that technology is active - projects the inbox descriptor through the same technology surface as
inbox-storeswhen that technology is active - publishes the provider-native CDC execution graph
mongodb-change-stream-capture-flow, hosted executionmongodb-change-stream-capture-pump, and execution runtimemongodb-change-stream-capture-pumpwhen change-stream captures are configured - runs a provider-native hosted service that watches configured collections, stages outbox publications, persists resume-token checkpoints only after stage success, and reports runtime posture through the shared
ICdcCaptureRuntimeReportersurface - stores durable change-stream resume tokens in the
{CollectionPrefix}cdc_change_stream_checkpointscollection - preserves authored capture ownership through
CdcCaptureDescriptor.SourceModuleIdwhile surfacingmetadata.contributorModuleId = "mongodb-data"when the provider pack contributes the descriptor on behalf of another module
Main surfaces
Section titled “Main surfaces”Configuration/MongoDbDataOptions.csConfiguration/MongoDbChangeStreamCaptureOptions.csModules/MongoDbDataModule.csRegistration/MongoDbDataEngineBuilderExtensions.csServices/MongoDbChangeStreamCaptureHostedService.csServices/MongoDbChangeStreamCheckpointEntry.csServices/MongoDbChangeStreamExecutionRuntimeContributor.csServices/MongoDbDataRuntimeIds.csServices/MongoDbOutboxEntry.csServices/MongoDbOutbox.csServices/MongoDbEventDispatchStore.csServices/MongoDbOutboxRuntimeSurfaceContributor.csServices/MongoDbInboxEntry.csServices/MongoDbInbox.csServices/MongoDbInboxRuntimeSurfaceContributor.cs
How it fits
Section titled “How it fits”This pack sits on top of Cephalon.Data, not in place of it. Cephalon.Data still owns the runtime-neutral IReadStore / IWriteStore dispatching surface, the shared CDC runtime-state catalog, capture-side execution binding, and the additive execution-runtime contract. Cephalon.Data.MongoDB adds MongoDB-backed outbox and inbox persistence so event-driven workloads can stage and track messages without switching to a relational store, and it also exposes the same staged outbox through IEventDispatchStore so consumer-managed or adapter-owned dispatch loops can read pending items and persist durable dispatch outcomes without MongoDB-specific host glue.
The same pack now also proves the first provider-native CDC runner on top of those shared contracts. Instead of inventing a MongoDB-only registry, the pack contributes normal CdcCaptureDescriptor entries, binds them to the provider-native execution runtime mongodb-change-stream-capture-pump, stages captured changes through the linked IOutbox, persists resume-token checkpoints only after stage success, and reports freshness, lag, publication posture, checkpoints, and failures back through the shared /engine/cdc-captures/runtime* surface. The shared data-cdc-capture-pump remains additive and simply ignores captures whose effective owner resolves to the MongoDB runtime.
The slice stays intentionally honest: it proves the companion-pack pattern for document-oriented stores, ships an idempotent outbox and inbox, and now also ships one collection-scoped provider-native CDC path. IReadStore and IWriteStore are still not backed directly by MongoDB in this slice, and broader out-of-process CDC reporting or higher-level multi-capture orchestration still remain later work.
Registration
Section titled “Registration”engine.AddMongoDbData( connectionString: "mongodb://localhost:27017", databaseName: "myapp");To enable the outbox and inbox paths:
engine.AddMongoDbData( connectionString: "mongodb://localhost:27017", databaseName: "myapp", configure: options => { options.RegisterOutbox = true; options.RegisterInbox = true; options.CollectionPrefix = "app_"; // optional - prefix all Cephalon collections });To enable the provider-native MongoDB CDC path, pair the pack with AddData() so the shared CDC catalogs and reporters exist:
engine.AddData();engine.AddMongoDbData(options =>{ configuration.GetSection(MongoDbDataOptions.SectionPath).Bind(options); options.ConnectionStringName ??= "MongoDB"; options.DatabaseName = "myapp"; options.RegisterOutbox = true; options.CollectionPrefix = "app_";
options.ChangeStreamCaptures.Add(new MongoDbChangeStreamCaptureOptions { Id = "catalog-items-cdc", DisplayName = "Catalog Items CDC", SourceModuleId = "catalog", CollectionName = "catalog_items", OutboxId = "mongodb-outbox", ChannelId = "catalog.items", MessageType = "CatalogItemChanged" });});If the same host also needs the shared Cephalon.Data in-process CDC pump for other non-MongoDB captures, enable it separately through AddData(options => options.EnableCdcExecution = true). The shared pump still skips any capture whose effective owner is mongodb-change-stream-capture-pump.
For configuration-driven hosts, prefer the options overload and let the pack resolve either ConnectionStringName from the root ConnectionStrings section or ConnectionString directly:
engine.AddData();engine.AddMongoDbData(options =>{ configuration.GetSection(MongoDbDataOptions.SectionPath).Bind(options); options.ConnectionStringName ??= "MongoDB"; options.DatabaseName = "myapp";});{ "ConnectionStrings": { "MongoDB": "mongodb://localhost:27017/?replicaSet=rs0" }, "Engine": { "Data": { "MongoDB": { "ConnectionStringName": "MongoDB", "DatabaseName": "myapp", "CollectionPrefix": "app_", "RegisterOutbox": true, "ChangeStreamCaptures": [ { "Id": "catalog-items-cdc", "DisplayName": "Catalog Items CDC", "SourceModuleId": "catalog", "CollectionName": "catalog_items", "OutboxId": "mongodb-outbox", "ChannelId": "catalog.items", "MessageType": "CatalogItemChanged", "FullDocumentMode": "update-lookup" } ] } } }}ConnectionStringName and ConnectionString are mutually exclusive. If both are set, the pack fails fast during service resolution. If neither is set, MongoDB falls back to mongodb://localhost:27017.
Configuration options (Engine:Data:MongoDB)
Section titled “Configuration options (Engine:Data:MongoDB)”| Option | Type | Default | Description |
|---|---|---|---|
ConnectionStringName | string? | null | Root ConnectionStrings key to resolve for MongoDB |
ConnectionString | string? | null | Inline MongoDB connection string |
DatabaseName | string | "cephalon" | Target database name |
CollectionPrefix | string | "" | Optional prefix for all Cephalon-managed collections |
RegisterOutbox | bool | false | Register IOutbox backed by the outbox_messages collection |
RegisterInbox | bool | false | Register IInbox backed by the inbox_receipts collection |
ChangeStreamCaptures | MongoDbChangeStreamCaptureOptions[] | [] | Contribute provider-native MongoDB change-stream CDC captures |
Change-stream capture options (Engine:Data:MongoDB:ChangeStreamCaptures[])
Section titled “Change-stream capture options (Engine:Data:MongoDB:ChangeStreamCaptures[])”| Option | Type | Default | Description |
|---|---|---|---|
Id | string | required | Stable CDC capture id |
DisplayName | string | Id | Operator-facing capture name |
Description | string | generated | Human-readable capture description |
SourceModuleId | string | required | Module id that owns the capture surface |
SourceId | string | {provider}:{database}/{collection} | Logical upstream source id when it should differ from the watched collection path |
DatabaseName | string? | pack-level DatabaseName | Database name to watch for this capture |
CollectionName | string | required | MongoDB collection to watch |
OutboxId | string | "mongodb-outbox" | Linked outbox id that receives publications |
ChannelId | string | required | Logical outbox channel id for emitted publications |
MessageType | string | required | Logical message type for emitted publications |
EventFormat | string | "mongodb-change-stream-event" | Operator-facing event format projected on the descriptor |
FullDocumentMode | string | "update-lookup" | MongoDB full-document mode: default, update-lookup, when-available, or required |
MaxAwaitTimeSeconds | int | 5 | Maximum await time for one change-stream batch |
BatchSize | int? | null | Optional batch-size hint for one change-stream batch |
ResourceIds | string[] | ["{database}.{collection}"] | Explicit logical resources observed by the capture |
Tags | string[] | ["cdc", "mongodb", "provider-native"] | Operator-facing tags on the descriptor |
Metadata | Dictionary<string,string> | {} | Additional operator-facing metadata merged onto the descriptor |
Outbox collection schema (outbox_messages)
Section titled “Outbox collection schema (outbox_messages)”The collection name is {CollectionPrefix}outbox_messages.
| Field | BSON type | Notes |
|---|---|---|
_id | ObjectId | Auto-generated surrogate key |
MessageId | string | Unique idempotency key (GUID); unique index prevents duplicate staging |
EventType | string | Fully qualified CLR event type name |
Payload | string | System.Text.Json-serialized event body |
CorrelationId | string? | Optional causality tracking, propagated from IBehaviorContext.CorrelationId |
TenantId | string? | Optional multi-tenancy discriminator |
CreatedAtUtc | DateTime | UTC timestamp when the message was staged |
DispatchedAtUtc | DateTime? | null until the message is marked dispatched |
DispatchAttemptCount | int | Incremented on each dispatch attempt; starts at 0 |
NextAttemptAtUtc | DateTime? | Populated by a dispatch adapter for delayed-retry intent |
Idempotency: A unique index on MessageId is created on first use. Calling StageAsync with a MessageId that already exists silently returns: the duplicate-key exception (error code 11000) is caught and swallowed.
Inbox collection schema (inbox_receipts)
Section titled “Inbox collection schema (inbox_receipts)”The collection name is {CollectionPrefix}inbox_receipts.
| Field | BSON type | Notes |
|---|---|---|
_id | ObjectId | Auto-generated surrogate key |
MessageId | string | Processed message id; unique index enforces exactly-once semantics |
ProcessedAtUtc | DateTime | UTC timestamp when the message was first processed |
Idempotency: A unique index on MessageId is created on first use. MarkProcessedAsync swallows the duplicate-key error: calling it twice for the same id is safe.
Provider-native CDC runtime
Section titled “Provider-native CDC runtime”When ChangeStreamCaptures are configured:
- each capture is published through
/engine/cdc-captures*withprovider = "mongodb",mode = "change-stream", and anexecutionBindingwhose authored and requested runtime id ismongodb-change-stream-capture-pump - the execution runtime is published through
/engine/cdc-capture-runtimes*andsnapshot.CdcCaptureExecutionRuntimeswithexecutionOwnership = host-managed,executionTopology = provider-native, andacknowledgementMode = provider-native - the same runtime publishes through
/engine/execution-graphs,/engine/hosted-executions,/engine/runtime-story, andsnapshotundermongodb-change-stream-capture-flowplusmongodb-change-stream-capture-pump - the hosted runner watches one collection per configured capture, serializes the raw MongoDB
BackingDocumentas relaxed Extended JSON, stages one outbox message per change, and only persists the latest resume token after the linked outbox accepted the batch - each staged outbox message uses deterministic id
{cdcCaptureId}:{resumeTokenHash}, content typeapplication/vnd.cephalon.mongodb.change-stream+json, headers forprovider,cdcCaptureId,databaseName,collectionName, andoperationType, plus metadata forsourceId,eventFormat, andresumeTokenHash - the runtime-state surface keeps typed freshness, current lag, pending-publication posture, checkpoint, change id, last operation type, checkpoint collection, and failure-kind metadata on the same
/engine/cdc-captures/runtime*catalog instead of inventing a MongoDB-only monitor - if a provider pack contributes a descriptor on behalf of another module, the authored
SourceModuleIdremains authoritative andmetadata.contributorModuleIdkeeps the contributing pack explicit
Checkpoint collection schema (cdc_change_stream_checkpoints)
Section titled “Checkpoint collection schema (cdc_change_stream_checkpoints)”The collection name is {CollectionPrefix}cdc_change_stream_checkpoints.
| Field | BSON type | Notes |
|---|---|---|
_id | ObjectId | Auto-generated surrogate key |
CdcCaptureId | string | Stable CDC capture id; one checkpoint record per capture |
ResumeTokenJson | string | Relaxed Extended JSON copy of the latest durable resume token |
ChangeId | string | SHA-256 hash derived from the persisted resume token |
UpdatedAtUtc | DateTime | UTC timestamp of the last durable checkpoint write |
Runtime capabilities
Section titled “Runtime capabilities”When MongoDbDataModule is active, the following capability keys appear in the runtime manifest:
| Capability key | When registered |
|---|---|
data.mongodb | Always |
data.document-store | Always |
data.outbox.mongodb | RegisterOutbox = true |
data.inbox.mongodb | RegisterInbox = true |
data.cdc.mongodb | ChangeStreamCaptures.Count > 0 |
Runtime surface entries
Section titled “Runtime surface entries”When the event-driven-integration technology is active, the following entries appear under /engine/snapshot:
| Surface | Entry id | provider metadata |
|---|---|---|
outbox-producers | mongodb-outbox | mongodb |
inbox-stores | mongodb-inbox | mongodb |
When provider-native change-stream captures are configured, the following additional runtime entries appear:
| Surface | Entry id | Notes |
|---|---|---|
/engine/cdc-capture-runtimes | mongodb-change-stream-capture-pump | Provider-native execution-runtime descriptor and aggregate summary |
/engine/hosted-executions | mongodb-change-stream-capture-pump | Background-service execution entry |
/engine/execution-graphs | mongodb-change-stream-capture-flow | Provider-native change-stream execution flow |
Not shipped in this slice
Section titled “Not shipped in this slice”This pack intentionally still does not claim:
IReadStore/IWriteStoredispatch backed by MongoDB: query and command handlers should useIMongoDatabasedirectly- transaction-scoped outbox staging (no MongoDB multi-document transaction is opened; each
StageAsyncis a singleInsertOneAsync) - pack-owned dispatch loops or broker-specific retry scheduling beyond the runtime-neutral
IEventDispatchStorebridge - projection rebuild orchestration
- out-of-process or externally reported MongoDB change-stream runtime observation beyond the in-process hosted service
- broader multi-collection orchestration or low-code capture generation beyond the current one-capture-per-configured-collection path
These remain explicit later slices so the current provider claim stays truthful.