Skip to content

Data Flows

This page traces the four end-to-end flows that carry value through the platform: real-time prepaid charging, offline CDR ingestion, postpaid bill-cycle execution, and CRM-driven on-demand operations. Each flow crosses the Reactor, a worker thread, the database, and — for the prepaid path — a DIAMETER peer.

Real-Time Prepaid Charging (DIAMETER)

A subscriber's session opens with a CC-Request-Type=INITIAL_REQUEST, may issue zero or more UPDATE_REQUEST mid-session, and closes with TERMINATION_REQUEST. Single events (SMS, MMS) use EVENT_REQUEST and skip the reservation phase.

sequenceDiagram
    autonumber
    participant NE as GGSN/PGW
    participant Acc as Acceptor
    participant PS as PrepaidServer (svc thread)
    participant Stack as DiameterStack
    participant Sub as Subscriber
    participant Rater as Rater
    participant DB as MySQL

    NE->>Acc: TCP connect
    Acc->>PS: open() → activate(THR_DETACHED)
    NE->>PS: CCR (INITIAL, units=N)
    PS->>Stack: decodeMessage()
    Stack-->>PS: DiameterCCR
    PS->>PS: BalanceReserve::fromCCR()
    PS->>Sub: new Subscriber(MSISDN)
    Sub->>DB: SELECT subscriber, balance, plan
    PS->>Rater: rate(Balance_reserve)
    Rater-->>PS: rated_price
    alt has_balance(reserved)
        PS->>Sub: reserve_balance()
        Sub->>DB: UPDATE balance.reserved
        PS->>DB: INSERT balance_reserve
        PS-->>NE: CCA (Result-Code=2001, Granted=N, Validity=30s)
    else credit limit reached
        PS-->>NE: CCA (Result-Code=4012)
    end

    Note over NE,PS: Mid-session updates repeat the BalanceReserve cycle.

    NE->>PS: CCR (TERMINATION, used=M)
    PS->>PS: Charge::fromCCR()
    PS->>Sub: new Subscriber(MSISDN)
    PS->>Rater: rate(Charge)
    PS->>Sub: debit_balance(final_price)
    Sub->>DB: UPDATE balance.balance, .reserved
    PS->>DB: INSERT charge
    PS-->>NE: CCA (Result-Code=2001)
    NE->>PS: TCP close
    PS->>PS: handle_close → thread exits

The PrepaidServer worker holds the connection for the entire session. Result codes follow RFC 4006: DIAMETER_USER_UNKNOWN (5030) when the MSISDN does not resolve, DIAMETER_END_USER_SERVICE_DENIED (4010) when the subscriber is not flagged prepaid, DIAMETER_RATING_FAILED (5031) when the rater rejects the request, and DIAMETER_CREDIT_LIMIT_REACHED (4012) on insufficient balance. Granted units default to whatever the network requested; validity time is hard-coded to 30 seconds.

A 1 MB cap on incoming DIAMETER message length is enforced in PrepaidServer::svc() to defend against malformed length fields used as a DoS vector.

Offline CDR Ingestion (Postpaid)

The mediation system drops protobuf-serialized billing.CDR records into cdr/new/. The CDR loader is a timer handler whose interval (typically 60s) governs polling cadence.

sequenceDiagram
    autonumber
    participant Med as Mediation
    participant FS as cdr/new/
    participant Reactor as ACE_Reactor
    participant Loader as CDRLoader
    participant Pool as ThreadPool
    participant Rater as Rater
    participant DB as MySQL

    Med->>FS: Drop *.cdr
    Reactor->>Loader: handle_timeout()
    Loader->>Loader: is_processing.exchange(true)
    Loader->>Pool: enqueue(scan_directory)
    Pool->>FS: list new_cdr_path
    loop per *.cdr file
        Pool->>Pool: CDR::deserialize(ifstream)
        Pool->>Pool: CDR::validate() — A-number + service id
        alt invalid
            Pool->>FS: rename → cdr/error/<file>.error
        else valid
            Pool->>DB: cdrExists(mediation_id)
            alt duplicate
                Pool->>FS: rename → cdr/duplicate/<file>.duplicate
                Pool->>DB: INSERT notification (duplicate rejected)
            else new
                Pool->>Rater: pre_rate (apply free units)
                Pool->>Rater: rate (compute final_price)
                Pool->>Rater: post_rate (volume discounts hook)
                Pool->>DB: INSERT cdr
                alt insert ok
                    Pool->>FS: rename → cdr/processed/<file>.done
                else duplicate (race-detected)
                    Pool->>FS: rename → cdr/duplicate/<file>.duplicate
                else insert fails
                    Pool->>FS: rename → cdr/error/<file>.error
                end
            end
        end
    end
    Pool->>Loader: is_processing = false

CDRLoader::handle_timeout returns immediately if a previous cycle is still running; this avoids two threads competing over the same files. The status code stamped on each row is EVENT_STATUS_RATED (2); the bill-cycle stage later transitions it to EVENT_STATUS_BILLED (4). Ingest is idempotent: the protobuf CDR.id is persisted as offline.mediation_id under a UNIQUE constraint, and the loader pre-checks via DB_layer::cdrExists to keep the typical happy-path off the duplicate-key handler.

Postpaid Bill-Cycle Execution

Bill-cycle execution is queued through the activities table, written by CRM and drained by the billing handlers. Each operator deployment runs N billing processes (one per bill-cycle), each filtering on its own billing.billcycle.

sequenceDiagram
    autonumber
    participant CRM as CRM (gRPC client)
    participant Srv as CRMServiceImpl
    participant DB as MySQL
    participant Reactor as ACE_Reactor
    participant BH as BillingHandler
    participant Pool as ThreadPool
    participant Rater as Rater
    participant Notif as notifications

    CRM->>Srv: ExecuteBillCycle(cycle, month, year)
    Srv->>DB: INSERT activity(type=FULL_BILLRUN, param=cycle, param2=month, param3=year)
    Reactor->>BH: handle_timeout()
    BH->>DB: evictStaleActivities(stale_s)
    Note over DB: Watchdog: PROCESSING rows<br/>with old heartbeat → ERROR
    BH->>DB: get_new_activity()
    Note over DB: NOT EXISTS gate skips rows<br/>whose cycle is already PROCESSING
    alt cycle already busy
        DB-->>BH: nullptr (row stays PENDING; retry next tick)
    else cycle free
        DB-->>BH: Activity{type=FULL_BILLRUN, param=cycle, started_at, heartbeat_at}
        BH->>Pool: enqueue(processActivity)
        Pool->>Pool: Match activity.param to configured billcycle
        alt cycle matches
            Pool->>DB: billCycle(activity, rater)
            Note over DB: Iterate postpaid subscribers,<br/>heartbeat every N subs,<br/>aggregate CDRs, stamp tax,<br/>INSERT bill, UPDATE cdr.status_id=BILLED
            DB-->>Pool: 0 / -1
            Pool->>DB: close_activity(status=COMPLETED|ERROR, finished_at)
            Pool->>Notif: insert "Billrun completed for cycle X"
        else cycle mismatch
            Pool-->>Pool: ret = -2 (silently skip)
        end
    end

The NOT EXISTS clause inside get_new_activity is the cross-process gate: a second FULL_BILLRUN request for a cycle already being worked is silently skipped on every tick until the first completes (or the watchdog evicts a presumed-dead worker). The skipped row stays PENDING and is dispatched immediately on the next tick after the gate clears. See Postpaid Billing for the heartbeat cadence and the threshold tuning.

On-demand single-subscriber billing follows the same activity table but with type=SINGLE_BILLING and the MSISDN in param. BillingHandler::billOne() resolves MSISDN → subscriber id, then either bills exactly one (year, month) period or, if both are zero, walks every distinct unbilled period returned by DB_layer::getUnbilledPeriods(). Each period yields one billing::Bill row stamped with a unique billing id, the subscriber's tax rate, and the four service totals; every CDR included in that bill has its status_id advanced to EVENT_STATUS_BILLED.

PDF Bill Formatting

The bill formatter runs as its own bill_formatter role. Its timer cycle picks up bills inserted by the billing handlers.

sequenceDiagram
    autonumber
    participant Reactor as ACE_Reactor
    participant BFH as BillFormatterHandler
    participant Pool as ThreadPool
    participant DB as MySQL
    participant Inja as Inja
    participant PB as plutobook
    participant FS as invoices/<y>/<m>/bc_<n>/

    Reactor->>BFH: handle_timeout()
    BFH->>DB: getUnformattedBills()
    DB-->>BFH: [bill_id, ...]
    loop per bill
        BFH->>Pool: enqueue(processBill(bill_id))
        Pool->>DB: lockBillForFormatting(bill_id) → bill, sub
        alt locked successfully
            Pool->>DB: REFDATA::getCurrency(sub.currency_id) → rate, symbol
            alt sub.prepaid == 1
                Pool->>DB: getPrepaidBalanceHistory(rate, symbol)
                Pool->>DB: getUsageHistory("online", billing_id, rate, symbol)
            else postpaid
                Pool->>DB: getUsageHistory("offline", billing_id, rate, symbol)
            end
            Note over Pool: Build JSON: convert every EUR amount,<br/>format VAT block (rate%, tax_amount, total_with_tax),<br/>include reissue line if reissue_count > 0
            Pool->>Inja: render(prepaid|postpaid.html, context)
            Inja-->>Pool: HTML
            Pool->>PB: Book.loadHtml() + writeToPdf()
            PB->>FS: write <billing_id>.pdf
            Pool->>DB: commitBillFormatted(bill_id, PAID|UNPAID, url)
            Pool->>DB: updateEventStatusToBilled(billing_id)
            Pool->>DB: INSERT notification (success)
        else lock not acquired
            Note over Pool: Skip — another worker owns it
        end
    end

lockBillForFormatting uses an atomic transition (BILL_STATUS_NEWBILL_STATUS_PROCESSING) so multiple bill-formatter instances can run in parallel without double-rendering. On failure, rollbackBillFormatting returns the row to BILL_STATUS_NEW.

On-Demand CRM Calls

Every CRM RPC is intercepted by AuthInterceptor, which extracts the Bearer token from gRPC metadata and validates it via JWTValidator::validate(). The validator parses the JWT, verifies the HS256 signature against the configured secret, and checks the issuer and audience claims.

sequenceDiagram
    autonumber
    participant PHP as crm2.micro.bss
    participant Auth as AuthInterceptor
    participant JWT as JWTValidator
    participant Impl as CRMServiceImpl
    participant Cache as RefData cache
    participant DB as MySQL

    PHP->>Auth: gRPC call + metadata.authorization=Bearer ...
    Auth->>JWT: validate(token)
    alt invalid
        Auth-->>PHP: UNAUTHENTICATED
    else valid
        Auth->>Impl: continue
        alt RPC == GetRefData
            Impl->>Cache: getCachedRefData(db)
            alt fresh (< 60s)
                Cache-->>Impl: cached RefData
            else stale
                Impl->>DB: getRefData()
                Impl->>Cache: refresh + timestamp
            end
            Impl-->>PHP: RefData
        else read RPC
            Impl->>DB: SELECT ...
            Impl-->>PHP: stream / single response
        else write RPC (INSERT, UPDATE, REDEEM)
            Impl->>DB: transactional write
            Impl-->>PHP: response
        else billing trigger (ExecuteBilling/ExecuteBillCycle)
            Impl->>DB: INSERT activity
            Impl-->>PHP: queued ack
        end
    end

GetRefData is hot — every CRM page load invokes it — so CRMServiceImpl keeps a 60-second TTL cache (REF_CACHE_TTL) guarded by ref_cache_mutex_ to avoid hammering the database from multiple gRPC worker threads.

Reference-Data Lookup Path

Rating is the hottest read path. To stay sub-millisecond it never touches the database — every price, every free-unit allowance, every service code resolves through the in-memory REFDATA singleton.

graph LR
    Startup["Startup<br/>(main.cpp)"] -->|"REFDATA::instance()<br/>→ Refdata::Refdata()"| Load["DB_layer::loadRef()<br/>DB_layer::loadCurrencies()<br/>DB_layer::loadTax()"]
    Load --> Tables[("ref_price_plan<br/>ref_service<br/>ref_resource<br/>ref_status<br/>ref_event_status<br/>ref_billcycle<br/>ref_voucher_status<br/>ref_currency<br/>ref_tax")]
    Tables --> Maps["Refdata in-memory maps<br/>(std::shared_mutex)"]

    Maps --> Rater["Rater::rate()"]
    Maps --> Validate["CDR::validate()"]
    Maps --> CRM["CRMServiceImpl::GetRefData()"]
    Maps --> Fmt["BillFormatterHandler::processBill()<br/>(getCurrency, stampTaxForBill)"]

    CRMAdmin["CRM admin action<br/>(plan change, etc.)"] -->|"REFDATA::instance()->refresh()"| Maps