Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/5.2.0 #788

Merged
merged 4 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
Version 5.2.0 (2023-06-02)
--------------------------
Add a filter API to plugins to decide whether to track an event or not (#783)
Add version to default remote configuration and don't update unless remote configuration is newer (#779)
Handle unprotected access to sending state in Emitter from concurrent threads (#774)

Version 5.1.0 (2023-05-11)
--------------------------
Track new properties in platform context version 1-0-3 and make it configurable which properties to track (#771)
Expand Down
2 changes: 1 addition & 1 deletion Examples
2 changes: 1 addition & 1 deletion SnowplowTracker.podspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Pod::Spec.new do |s|
s.name = "SnowplowTracker"
s.version = "5.1.0"
s.version = "5.2.0"
s.summary = "Snowplow event tracker for iOS, macOS, tvOS, watchOS for apps and games."
s.description = <<-DESC
Snowplow is a mobile and event analytics platform with a difference: rather than tell our users how they should analyze their data, we deliver their event-level data in their own data warehouse, on their own Amazon Redshift or Postgres database, so they can analyze it any way they choose. Snowplow mobile is used by data-savvy games companies and app developers to better understand their users and how they engage with their games and applications. Snowplow is open source using the business-friendly Apache License, Version 2.0 and scales horizontally to many billions of events.
Expand Down
91 changes: 59 additions & 32 deletions Sources/Core/Emitter/Emitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ let POST_WRAPPER_BYTES = 88
class Emitter: NSObject, EmitterEventProcessing {

private var timer: Timer?
/// Whether the emitter is currently sending.
private(set) var isSending = false
private var dataOperationQueue: OperationQueue = OperationQueue()
private var builderFinished = false

private var pausedEmit = false

private var sendingCheck = SendingCheck()
/// Whether the emitter is currently sending.
var isSending: Bool { return sendingCheck.sending }

private var _urlEndpoint: String?
/// Collector endpoint.
Expand Down Expand Up @@ -330,29 +330,22 @@ class Emitter: NSObject, EmitterEventProcessing {

/// Allows sending events to collector.
func resumeEmit() {
pausedEmit = false
sendingCheck.pausedEmit = false
flush()
}

/// Suspends sending events to collector.
func pauseEmit() {
pausedEmit = true
sendingCheck.pausedEmit = true
}

/// Insert a Payload object into the buffer to be sent to collector.
/// This method will add the payload to the database and flush (send all events).
/// - Parameter eventPayload: A Payload containing a completed event to be added into the buffer.
func addPayload(toBuffer eventPayload: Payload) {
weak var weakSelf = self

DispatchQueue.global(qos: .default).async {
let strongSelf = weakSelf
if strongSelf == nil {
return
}

strongSelf?.eventStore?.addEvent(eventPayload)
strongSelf?.flush()
DispatchQueue.global(qos: .default).async { [weak self] in
self?.eventStore?.addEvent(eventPayload)
self?.flush()
}
}

Expand All @@ -369,23 +362,19 @@ class Emitter: NSObject, EmitterEventProcessing {

// MARK: - Control methods

func sendGuard() {
if isSending || pausedEmit {
return
}
objc_sync_enter(self)
if !isSending && !pausedEmit {
isSending = true
private func sendGuard() {
if sendingCheck.requestToStartSending() {
objc_sync_enter(self)
attemptEmit()
objc_sync_exit(self)
sendingCheck.sending = false
}
objc_sync_exit(self)
}

func attemptEmit() {
private func attemptEmit() {
guard let eventStore = eventStore else { return }
if eventStore.count() == 0 {
logDebug(message: "Database empty. Returning.")
isSending = false
return
}

Expand Down Expand Up @@ -435,14 +424,13 @@ class Emitter: NSObject, EmitterEventProcessing {
if failedWillRetryCount > 0 && successCount == 0 {
logDebug(message: "Ending emitter run as all requests failed.")
Thread.sleep(forTimeInterval: 5)
isSending = false
return
} else {
self.attemptEmit()
}
}

func buildRequests(fromEvents events: [EmitterEvent]) -> [Request] {
private func buildRequests(fromEvents events: [EmitterEvent]) -> [Request] {
var requests: [Request] = []
guard let networkConnection = networkConnection else { return requests }

Expand Down Expand Up @@ -503,16 +491,16 @@ class Emitter: NSObject, EmitterEventProcessing {
return requests
}

func isOversize(_ payload: Payload) -> Bool {
private func isOversize(_ payload: Payload) -> Bool {
return isOversize(payload, previousPayloads: [])
}

func isOversize(_ payload: Payload, previousPayloads: [Payload]) -> Bool {
private func isOversize(_ payload: Payload, previousPayloads: [Payload]) -> Bool {
let byteLimit = networkConnection?.httpMethod == .get ? byteLimitGet : byteLimitPost
return isOversize(payload, byteLimit: byteLimit, previousPayloads: previousPayloads)
}

func isOversize(_ payload: Payload, byteLimit: Int, previousPayloads: [Payload]) -> Bool {
private func isOversize(_ payload: Payload, byteLimit: Int, previousPayloads: [Payload]) -> Bool {
var totalByteSize = payload.byteSize
for previousPayload in previousPayloads {
totalByteSize += previousPayload.byteSize
Expand All @@ -529,3 +517,42 @@ class Emitter: NSObject, EmitterEventProcessing {
pauseTimer()
}
}

fileprivate class SendingCheck {
private var _sending = false
var sending: Bool {
get {
return lock { return _sending }
}
set {
lock { _sending = newValue }
}
}

private var _pausedEmit = false
var pausedEmit: Bool {
get {
return lock { return _pausedEmit }
}
set {
lock { _pausedEmit = newValue }
}
}

func requestToStartSending() -> Bool {
return lock {
if !_sending && !_pausedEmit {
_sending = true
return true
} else {
return false
}
}
}

private func lock<T>(closure: () -> T) -> T {
objc_sync_enter(self)
defer { objc_sync_exit(self) }
return closure()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

import Foundation

class GlobalContextPluginConfiguration: ConfigurationProtocol, PluginConfigurationProtocol {
class GlobalContextPluginConfiguration: ConfigurationProtocol, PluginIdentifiable, PluginEntitiesCallable {
private(set) var identifier: String
private(set) var globalContext: GlobalContext
private(set) var afterTrackConfiguration: PluginAfterTrackConfiguration? = nil
private(set) var entitiesConfiguration: PluginEntitiesConfiguration?

init(identifier: String, globalContext: GlobalContext) {
Expand Down
135 changes: 70 additions & 65 deletions Sources/Core/RemoteConfiguration/ConfigurationCache.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,94 +19,93 @@ class ConfigurationCache: NSObject {

init(remoteConfiguration: RemoteConfiguration) {
super.init()
#if !(os(tvOS)) && !(os(watchOS))
#if !(os(tvOS)) && !(os(watchOS))
createCachePath(with: remoteConfiguration)
#endif
#endif
}

func read() -> FetchedConfigurationBundle? {
objc_sync_enter(self)
defer { objc_sync_exit(self) }
#if !(os(tvOS)) && !(os(watchOS))
if let configuration = configuration {
return lock {
#if !(os(tvOS)) && !(os(watchOS))
if let configuration = configuration {
return configuration
}
load()
#endif
return configuration
}
load()
#endif
return configuration
}

func write(_ configuration: FetchedConfigurationBundle) {
objc_sync_enter(self)
self.configuration = configuration
#if !(os(tvOS)) && !(os(watchOS))
store()
#endif
objc_sync_exit(self)
lock {
self.configuration = configuration
#if !(os(tvOS)) && !(os(watchOS))
store()
#endif
}
}

func clear() {
objc_sync_enter(self)
defer { objc_sync_exit(self) }
configuration = nil
#if !(os(tvOS)) && !(os(watchOS))
if let cacheFileUrl = cacheFileUrl {
do {
try FileManager.default.removeItem(at: cacheFileUrl)
} catch let error {
logError(message: String(format: "Error on clearing configuration from cache: %@", error.localizedDescription))
lock {
configuration = nil
#if !(os(tvOS)) && !(os(watchOS))
if let cacheFileUrl = cacheFileUrl {
do {
try FileManager.default.removeItem(at: cacheFileUrl)
} catch let error {
logError(message: String(format: "Error on clearing configuration from cache: %@", error.localizedDescription))
}
}
#endif
}
#endif
}

// Private method

func load() {
objc_sync_enter(self)
defer { objc_sync_exit(self) }
guard let cacheFileUrl = cacheFileUrl,
let data = try? Data(contentsOf: cacheFileUrl) else { return }
if #available(iOS 12, tvOS 12, watchOS 5, macOS 10.14, *) {
do {
configuration = try NSKeyedUnarchiver.unarchiveTopLevelObjectWithData(data) as? FetchedConfigurationBundle
} catch let error {
logError(message: String(format: "Exception on getting configuration from cache: %@", error.localizedDescription))
configuration = nil
lock {
guard let cacheFileUrl = cacheFileUrl,
let data = try? Data(contentsOf: cacheFileUrl) else { return }
if #available(iOS 12, tvOS 12, watchOS 5, macOS 10.14, *) {
do {
configuration = try NSKeyedUnarchiver.unarchiveTopLevelObjectWithData(data) as? FetchedConfigurationBundle
} catch let error {
logError(message: String(format: "Exception on getting configuration from cache: %@", error.localizedDescription))
configuration = nil
}
} else {
let unarchiver = NSKeyedUnarchiver(forReadingWith: data)
configuration = unarchiver.decodeObject() as? FetchedConfigurationBundle
unarchiver.finishDecoding()
}
} else {
let unarchiver = NSKeyedUnarchiver(forReadingWith: data)
configuration = unarchiver.decodeObject() as? FetchedConfigurationBundle
unarchiver.finishDecoding()
}
}

func store() {
_ = DispatchQueue.global(qos: .default)
objc_sync_enter(self)
defer { objc_sync_exit(self) }

guard let configuration = configuration,
let cacheFileUrl = cacheFileUrl else { return }

do {
var data = Data()
var archiver: NSKeyedArchiver?
lock {
guard let configuration = configuration,
let cacheFileUrl = cacheFileUrl else { return }

if #available(iOS 12, tvOS 12, watchOS 5, macOS 10.14, *) {
archiver = NSKeyedArchiver(requiringSecureCoding: true)
archiver?.encode(configuration, forKey: "root")
if let encodedData = archiver?.encodedData {
data = encodedData
do {
var data = Data()
var archiver: NSKeyedArchiver?

if #available(iOS 12, tvOS 12, watchOS 5, macOS 10.14, *) {
archiver = NSKeyedArchiver(requiringSecureCoding: true)
archiver?.encode(configuration, forKey: "root")
if let encodedData = archiver?.encodedData {
data = encodedData
}
} else {
archiver = NSKeyedArchiver(forWritingWith: data as! NSMutableData)
archiver?.encode(configuration)
archiver?.finishEncoding()
}
} else {
archiver = NSKeyedArchiver(forWritingWith: data as! NSMutableData)
archiver?.encode(configuration)
archiver?.finishEncoding()
try data.write(to: cacheFileUrl, options: .atomic)
} catch let error {
logError(message: String(format: "Error on caching configuration: %@", error.localizedDescription))
}
try data.write(to: cacheFileUrl, options: .atomic)
} catch let error {
logError(message: String(format: "Error on caching configuration: %@", error.localizedDescription))
}
}

Expand All @@ -126,4 +125,10 @@ class ConfigurationCache: NSObject {
cacheFileUrl = url
}
}

private func lock<T>(closure: () -> T) -> T {
objc_sync_enter(self)
defer { objc_sync_exit(self) }
return closure()
}
}
Loading