|
|
@@ -44,52 +44,52 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
private let processQueue = DispatchQueue(label: "BaseNetworkManager.processQueue")
|
|
|
private var ping: TimeInterval?
|
|
|
|
|
|
- // Queue where lane pipelines run.
|
|
|
- let laneQueue = DispatchQueue(label: "NightscoutManager.lanes", qos: .utility)
|
|
|
+ // Queue where upload pipelines run.
|
|
|
+ let uploadPipelineQueue = DispatchQueue(label: "NightscoutManager.uploadPipelines", qos: .utility)
|
|
|
|
|
|
// Background Core Data context for fetches used by upload tasks.
|
|
|
var backgroundContext = CoreDataStack.shared.newTaskContext()
|
|
|
|
|
|
- /// Throttle window (seconds) per lane. Any kicks inside this window
|
|
|
- /// coalesce into a single upload run for that lane.
|
|
|
- let laneInterval: [NightscoutLane: TimeInterval] = [
|
|
|
+ /// Throttle window (seconds) per upload pipeline. Any requests inside this window
|
|
|
+ /// coalesce into a single upload run for that pipeline.
|
|
|
+ let uploadPipelineInterval: [NightscoutUploadPipeline: TimeInterval] = [
|
|
|
.carbs: 2, .pumpHistory: 2, .overrides: 2, .tempTargets: 2,
|
|
|
.glucose: 2, .manualGlucose: 2, .deviceStatus: 2
|
|
|
]
|
|
|
|
|
|
- /// Subjects used to “kick” a lane. The pipeline applies a throttle so
|
|
|
+ /// Subjects used to request an upload pipeline. The pipeline applies a throttle so
|
|
|
/// close calls don’t double-upload.
|
|
|
- var laneSubjects: [NightscoutLane: PassthroughSubject<Void, Never>] = {
|
|
|
- var d: [NightscoutLane: PassthroughSubject<Void, Never>] = [:]
|
|
|
- NightscoutLane.allCases.forEach { d[$0] = PassthroughSubject<Void, Never>() }
|
|
|
+ var uploadPipelineSubjects: [NightscoutUploadPipeline: PassthroughSubject<Void, Never>] = {
|
|
|
+ var d: [NightscoutUploadPipeline: PassthroughSubject<Void, Never>] = [:]
|
|
|
+ NightscoutUploadPipeline.allCases.forEach { d[$0] = PassthroughSubject<Void, Never>() }
|
|
|
return d
|
|
|
}()
|
|
|
|
|
|
- /// Send a kick for a lane (enqueue work). Safe to call from anywhere.
|
|
|
- func kick(_ lane: NightscoutLane) {
|
|
|
- laneSubjects[lane]?.send(())
|
|
|
+ /// Request an upload for a pipeline (enqueue work). Safe to call from anywhere.
|
|
|
+ func requestUpload(_ uploadPipeline: NightscoutUploadPipeline) {
|
|
|
+ uploadPipelineSubjects[uploadPipeline]?.send(())
|
|
|
}
|
|
|
|
|
|
- /// Build the Combine pipelines for all lanes: subject → throttle → upload.
|
|
|
+ /// Build the Combine pipelines for all upload pipelines: subject → throttle → upload.
|
|
|
/// Must be called once during init().
|
|
|
func setupLanePipelines() {
|
|
|
- for lane in NightscoutLane.allCases {
|
|
|
- guard let subject = laneSubjects[lane], let window = laneInterval[lane] else { continue }
|
|
|
+ for pipeline in NightscoutUploadPipeline.allCases {
|
|
|
+ guard let subject = uploadPipelineSubjects[pipeline], let window = uploadPipelineInterval[pipeline] else { continue }
|
|
|
subject
|
|
|
- .receive(on: laneQueue)
|
|
|
- .throttle(for: .seconds(window), scheduler: laneQueue, latest: false)
|
|
|
+ .receive(on: uploadPipelineQueue)
|
|
|
+ .throttle(for: .seconds(window), scheduler: uploadPipelineQueue, latest: false)
|
|
|
.sink { [weak self] in
|
|
|
guard let self else { return }
|
|
|
- Task(priority: .utility) { await self.runLane(lane) }
|
|
|
+ Task(priority: .utility) { await self.runUploadPipeline(pipeline) }
|
|
|
}
|
|
|
.store(in: &subscriptions)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Runs the actual upload for a single lane.
|
|
|
+ /// Runs the actual upload for a single upload pipeline.
|
|
|
/// Called by the throttled pipeline, not directly by callers.
|
|
|
- func runLane(_ lane: NightscoutLane) async {
|
|
|
- switch lane {
|
|
|
+ func runUploadPipeline(_ uploadPipeline: NightscoutUploadPipeline) async {
|
|
|
+ switch uploadPipeline {
|
|
|
case .carbs: await uploadCarbs()
|
|
|
case .pumpHistory: await uploadPumpHistory()
|
|
|
case .overrides: await uploadOverrides()
|
|
|
@@ -135,7 +135,7 @@ final class BaseNightscoutManager: NightscoutManager, Injectable {
|
|
|
let queue = DispatchQueue(label: "BaseNightscoutManager.queue", qos: .utility)
|
|
|
|
|
|
/// Emits changed Core Data object IDs from the app. We filter by entity names
|
|
|
- /// and kick lanes based on what changed.
|
|
|
+ /// and request upload pipelines based on what changed.
|
|
|
var coreDataPublisher: AnyPublisher<Set<NSManagedObjectID>, Never>?
|
|
|
|
|
|
/// Bag for Combine subscriptions owned by this manager.
|