Data Flows¶
This page traces the four end-to-end flows that carry value through the platform: real-time prepaid charging, offline CDR ingestion, postpaid bill-cycle execution, and CRM-driven on-demand operations. Each flow crosses the Reactor, a worker thread, the database, and — for the prepaid path — a DIAMETER peer.
Real-Time Prepaid Charging (DIAMETER)¶
A subscriber's session opens with a CC-Request-Type=INITIAL_REQUEST, may issue zero or more UPDATE_REQUEST mid-session, and closes with TERMINATION_REQUEST. Single events (SMS, MMS) use EVENT_REQUEST and skip the reservation phase.
sequenceDiagram
autonumber
participant NE as GGSN/PGW
participant Acc as Acceptor
participant PS as PrepaidServer (svc thread)
participant Stack as DiameterStack
participant Sub as Subscriber
participant Rater as Rater
participant DB as MySQL
NE->>Acc: TCP connect
Acc->>PS: open() → activate(THR_DETACHED)
NE->>PS: CCR (INITIAL, units=N)
PS->>Stack: decodeMessage()
Stack-->>PS: DiameterCCR
PS->>PS: BalanceReserve::fromCCR()
PS->>Sub: new Subscriber(MSISDN)
Sub->>DB: SELECT subscriber, balance, plan
PS->>Rater: rate(Balance_reserve)
Rater-->>PS: rated_price
alt has_balance(reserved)
PS->>Sub: reserve_balance()
Sub->>DB: UPDATE balance.reserved
PS->>DB: INSERT balance_reserve
PS-->>NE: CCA (Result-Code=2001, Granted=N, Validity=30s)
else credit limit reached
PS-->>NE: CCA (Result-Code=4012)
end
Note over NE,PS: Mid-session updates repeat the BalanceReserve cycle.
NE->>PS: CCR (TERMINATION, used=M)
PS->>PS: Charge::fromCCR()
PS->>Sub: new Subscriber(MSISDN)
PS->>Rater: rate(Charge)
PS->>Sub: debit_balance(final_price)
Sub->>DB: UPDATE balance.balance, .reserved
PS->>DB: INSERT charge
PS-->>NE: CCA (Result-Code=2001)
NE->>PS: TCP close
PS->>PS: handle_close → thread exits
The PrepaidServer worker holds the connection for the entire session. Result codes follow RFC 4006: DIAMETER_USER_UNKNOWN (5030) when the MSISDN does not resolve, DIAMETER_END_USER_SERVICE_DENIED (4010) when the subscriber is not flagged prepaid, DIAMETER_RATING_FAILED (5031) when the rater rejects the request, and DIAMETER_CREDIT_LIMIT_REACHED (4012) on insufficient balance. Granted units default to whatever the network requested; validity time is hard-coded to 30 seconds.
A 1 MB cap on incoming DIAMETER message length is enforced in PrepaidServer::svc() to defend against malformed length fields used as a DoS vector.
Offline CDR Ingestion (Postpaid)¶
The mediation system drops protobuf-serialized billing.CDR records into cdr/new/. The CDR loader is a timer handler whose interval (typically 60s) governs polling cadence.
sequenceDiagram
autonumber
participant Med as Mediation
participant FS as cdr/new/
participant Reactor as ACE_Reactor
participant Loader as CDRLoader
participant Pool as ThreadPool
participant Rater as Rater
participant DB as MySQL
Med->>FS: Drop *.cdr
Reactor->>Loader: handle_timeout()
Loader->>Loader: is_processing.exchange(true)
Loader->>Pool: enqueue(scan_directory)
Pool->>FS: list new_cdr_path
loop per *.cdr file
Pool->>Pool: CDR::deserialize(ifstream)
Pool->>Pool: CDR::validate() — A-number + service id
alt invalid
Pool->>FS: rename → cdr/error/<file>.error
else valid
Pool->>DB: cdrExists(mediation_id)
alt duplicate
Pool->>FS: rename → cdr/duplicate/<file>.duplicate
Pool->>DB: INSERT notification (duplicate rejected)
else new
Pool->>Rater: pre_rate (apply free units)
Pool->>Rater: rate (compute final_price)
Pool->>Rater: post_rate (volume discounts hook)
Pool->>DB: INSERT cdr
alt insert ok
Pool->>FS: rename → cdr/processed/<file>.done
else duplicate (race-detected)
Pool->>FS: rename → cdr/duplicate/<file>.duplicate
else insert fails
Pool->>FS: rename → cdr/error/<file>.error
end
end
end
end
Pool->>Loader: is_processing = false
CDRLoader::handle_timeout returns immediately if a previous cycle is still running; this avoids two threads competing over the same files. The status code stamped on each row is EVENT_STATUS_RATED (2); the bill-cycle stage later transitions it to EVENT_STATUS_BILLED (4). Ingest is idempotent: the protobuf CDR.id is persisted as offline.mediation_id under a UNIQUE constraint, and the loader pre-checks via DB_layer::cdrExists to keep the typical happy-path off the duplicate-key handler.
Postpaid Bill-Cycle Execution¶
Bill-cycle execution is queued through the activities table, written by CRM and drained by the billing handlers. Each operator deployment runs N billing processes (one per bill-cycle), each filtering on its own billing.billcycle.
sequenceDiagram
autonumber
participant CRM as CRM (gRPC client)
participant Srv as CRMServiceImpl
participant DB as MySQL
participant Reactor as ACE_Reactor
participant BH as BillingHandler
participant Pool as ThreadPool
participant Rater as Rater
participant Notif as notifications
CRM->>Srv: ExecuteBillCycle(cycle, month, year)
Srv->>DB: INSERT activity(type=FULL_BILLRUN, param=cycle, param2=month, param3=year)
Reactor->>BH: handle_timeout()
BH->>DB: evictStaleActivities(stale_s)
Note over DB: Watchdog: PROCESSING rows<br/>with old heartbeat → ERROR
BH->>DB: get_new_activity()
Note over DB: NOT EXISTS gate skips rows<br/>whose cycle is already PROCESSING
alt cycle already busy
DB-->>BH: nullptr (row stays PENDING; retry next tick)
else cycle free
DB-->>BH: Activity{type=FULL_BILLRUN, param=cycle, started_at, heartbeat_at}
BH->>Pool: enqueue(processActivity)
Pool->>Pool: Match activity.param to configured billcycle
alt cycle matches
Pool->>DB: billCycle(activity, rater)
Note over DB: Iterate postpaid subscribers,<br/>heartbeat every N subs,<br/>aggregate CDRs, stamp tax,<br/>INSERT bill, UPDATE cdr.status_id=BILLED
DB-->>Pool: 0 / -1
Pool->>DB: close_activity(status=COMPLETED|ERROR, finished_at)
Pool->>Notif: insert "Billrun completed for cycle X"
else cycle mismatch
Pool-->>Pool: ret = -2 (silently skip)
end
end
The NOT EXISTS clause inside get_new_activity is the cross-process gate: a second FULL_BILLRUN request for a cycle already being worked is silently skipped on every tick until the first completes (or the watchdog evicts a presumed-dead worker). The skipped row stays PENDING and is dispatched immediately on the next tick after the gate clears. See Postpaid Billing for the heartbeat cadence and the threshold tuning.
On-demand single-subscriber billing follows the same activity table but with type=SINGLE_BILLING and the MSISDN in param. BillingHandler::billOne() resolves MSISDN → subscriber id, then either bills exactly one (year, month) period or, if both are zero, walks every distinct unbilled period returned by DB_layer::getUnbilledPeriods(). Each period yields one billing::Bill row stamped with a unique billing id, the subscriber's tax rate, and the four service totals; every CDR included in that bill has its status_id advanced to EVENT_STATUS_BILLED.
PDF Bill Formatting¶
The bill formatter runs as its own bill_formatter role. Its timer cycle picks up bills inserted by the billing handlers.
sequenceDiagram
autonumber
participant Reactor as ACE_Reactor
participant BFH as BillFormatterHandler
participant Pool as ThreadPool
participant DB as MySQL
participant Inja as Inja
participant PB as plutobook
participant FS as invoices/<y>/<m>/bc_<n>/
Reactor->>BFH: handle_timeout()
BFH->>DB: getUnformattedBills()
DB-->>BFH: [bill_id, ...]
loop per bill
BFH->>Pool: enqueue(processBill(bill_id))
Pool->>DB: lockBillForFormatting(bill_id) → bill, sub
alt locked successfully
Pool->>DB: REFDATA::getCurrency(sub.currency_id) → rate, symbol
alt sub.prepaid == 1
Pool->>DB: getPrepaidBalanceHistory(rate, symbol)
Pool->>DB: getUsageHistory("online", billing_id, rate, symbol)
else postpaid
Pool->>DB: getUsageHistory("offline", billing_id, rate, symbol)
end
Note over Pool: Build JSON: convert every EUR amount,<br/>format VAT block (rate%, tax_amount, total_with_tax),<br/>include reissue line if reissue_count > 0
Pool->>Inja: render(prepaid|postpaid.html, context)
Inja-->>Pool: HTML
Pool->>PB: Book.loadHtml() + writeToPdf()
PB->>FS: write <billing_id>.pdf
Pool->>DB: commitBillFormatted(bill_id, PAID|UNPAID, url)
Pool->>DB: updateEventStatusToBilled(billing_id)
Pool->>DB: INSERT notification (success)
else lock not acquired
Note over Pool: Skip — another worker owns it
end
end
lockBillForFormatting uses an atomic transition (BILL_STATUS_NEW → BILL_STATUS_PROCESSING) so multiple bill-formatter instances can run in parallel without double-rendering. On failure, rollbackBillFormatting returns the row to BILL_STATUS_NEW.
On-Demand CRM Calls¶
Every CRM RPC is intercepted by AuthInterceptor, which extracts the Bearer token from gRPC metadata and validates it via JWTValidator::validate(). The validator parses the JWT, verifies the HS256 signature against the configured secret, and checks the issuer and audience claims.
sequenceDiagram
autonumber
participant PHP as crm2.micro.bss
participant Auth as AuthInterceptor
participant JWT as JWTValidator
participant Impl as CRMServiceImpl
participant Cache as RefData cache
participant DB as MySQL
PHP->>Auth: gRPC call + metadata.authorization=Bearer ...
Auth->>JWT: validate(token)
alt invalid
Auth-->>PHP: UNAUTHENTICATED
else valid
Auth->>Impl: continue
alt RPC == GetRefData
Impl->>Cache: getCachedRefData(db)
alt fresh (< 60s)
Cache-->>Impl: cached RefData
else stale
Impl->>DB: getRefData()
Impl->>Cache: refresh + timestamp
end
Impl-->>PHP: RefData
else read RPC
Impl->>DB: SELECT ...
Impl-->>PHP: stream / single response
else write RPC (INSERT, UPDATE, REDEEM)
Impl->>DB: transactional write
Impl-->>PHP: response
else billing trigger (ExecuteBilling/ExecuteBillCycle)
Impl->>DB: INSERT activity
Impl-->>PHP: queued ack
end
end
GetRefData is hot — every CRM page load invokes it — so CRMServiceImpl keeps a 60-second TTL cache (REF_CACHE_TTL) guarded by ref_cache_mutex_ to avoid hammering the database from multiple gRPC worker threads.
Reference-Data Lookup Path¶
Rating is the hottest read path. To stay sub-millisecond it never touches the database — every price, every free-unit allowance, every service code resolves through the in-memory REFDATA singleton.
graph LR
Startup["Startup<br/>(main.cpp)"] -->|"REFDATA::instance()<br/>→ Refdata::Refdata()"| Load["DB_layer::loadRef()<br/>DB_layer::loadCurrencies()<br/>DB_layer::loadTax()"]
Load --> Tables[("ref_price_plan<br/>ref_service<br/>ref_resource<br/>ref_status<br/>ref_event_status<br/>ref_billcycle<br/>ref_voucher_status<br/>ref_currency<br/>ref_tax")]
Tables --> Maps["Refdata in-memory maps<br/>(std::shared_mutex)"]
Maps --> Rater["Rater::rate()"]
Maps --> Validate["CDR::validate()"]
Maps --> CRM["CRMServiceImpl::GetRefData()"]
Maps --> Fmt["BillFormatterHandler::processBill()<br/>(getCurrency, stampTaxForBill)"]
CRMAdmin["CRM admin action<br/>(plan change, etc.)"] -->|"REFDATA::instance()->refresh()"| Maps