ANR Defensive Coding (#1132)

* Made a number of changes to try and improve background ANRs

Added some more logs to the BatchMessageReceiveJob (to make it easier to track a specific job)
Shifted the ConversationActivity adapter initialisation to run on a background thread to reduce the hang when opening a conversation
Updated the ConversationViewModel to cache the recipient and openGroup values to avoid accessing the database unnecessarily
Updated the code to just stop all current closed group pollers instead of fetching a list to stop
Updated the PN registration to be triggered in an AsyncTask
Updated the call code to unregister a couple of additional receivers
Updated the background poller so it waits for 15 mins before running and doesn't replace the existing scheduler (allows for PNs to trigger explicit background polling)
Fixed an issue where we were sending push notifications which were too large and likely to fail as a result (non-pre-offer call messages)
Fixed an issue where a failing Open Group poller could prevent the background poller from receiving and processing DMs

* Updated to a more coroutine-y convention
This commit is contained in:
Morgan Pretty 2023-03-31 13:24:36 +11:00 committed by GitHub
parent 5e28af2be4
commit eb739bdc9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 269 additions and 73 deletions

View File

@ -43,6 +43,7 @@ dependencies {
implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycleVersion"
implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycleVersion"
implementation "androidx.lifecycle:lifecycle-process:$lifecycleVersion"
implementation "androidx.lifecycle:lifecycle-extensions:2.2.0"
implementation "androidx.paging:paging-runtime-ktx:$pagingVersion"
implementation 'androidx.activity:activity-ktx:1.5.1'
implementation 'androidx.fragment:fragment-ktx:1.5.3'

View File

@ -272,7 +272,7 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
if (poller != null) {
poller.stopIfNeeded();
}
ClosedGroupPollerV2.getShared().stop();
ClosedGroupPollerV2.getShared().stopAll();
}
@Override
@ -452,11 +452,15 @@ public class ApplicationContext extends Application implements DefaultLifecycleO
String token = task.getResult().getToken();
String userPublicKey = TextSecurePreferences.getLocalNumber(this);
if (userPublicKey == null) return Unit.INSTANCE;
if (TextSecurePreferences.isUsingFCM(this)) {
LokiPushNotificationManager.register(token, userPublicKey, this, force);
} else {
LokiPushNotificationManager.unregister(token, this);
}
AsyncTask.THREAD_POOL_EXECUTOR.execute(() -> {
if (TextSecurePreferences.isUsingFCM(this)) {
LokiPushNotificationManager.register(token, userPublicKey, this, force);
} else {
LokiPushNotificationManager.unregister(token, this);
}
});
return Unit.INSTANCE;
});
}

View File

@ -34,6 +34,7 @@ import com.annimon.stream.Stream
import dagger.hilt.android.AndroidEntryPoint
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import network.loki.messenger.R
import network.loki.messenger.databinding.ActivityConversationV2Binding
import network.loki.messenger.databinding.ViewVisibleMessageBinding
@ -113,6 +114,7 @@ import org.thoughtcrime.securesms.permissions.Permissions
import org.thoughtcrime.securesms.reactions.ReactionsDialogFragment
import org.thoughtcrime.securesms.reactions.any.ReactWithAnyEmojiDialogFragment
import org.thoughtcrime.securesms.util.*
import java.lang.ref.WeakReference
import java.util.*
import java.util.concurrent.ExecutionException
import java.util.concurrent.atomic.AtomicLong
@ -304,12 +306,13 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
// messageIdToScroll
messageToScrollTimestamp.set(intent.getLongExtra(SCROLL_MESSAGE_ID, -1))
messageToScrollAuthor.set(intent.getParcelableExtra(SCROLL_MESSAGE_AUTHOR))
val thread = threadDb.getRecipientForThreadId(viewModel.threadId)
if (thread == null) {
val recipient = viewModel.recipient
val openGroup = recipient.let { viewModel.openGroup }
if (recipient == null || (recipient.isOpenGroupRecipient && openGroup == null)) {
Toast.makeText(this, "This thread has been deleted.", Toast.LENGTH_LONG).show()
return finish()
}
setUpRecyclerView()
setUpToolBar()
setUpInputBar()
setUpLinkPreviewObserver()
@ -336,22 +339,31 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
}
}
}
unreadCount = mmsSmsDb.getUnreadCount(viewModel.threadId)
updateUnreadCountIndicator()
setUpTypingObserver()
setUpRecipientObserver()
updateSubtitle()
getLatestOpenGroupInfoIfNeeded()
setUpBlockedBanner()
binding!!.searchBottomBar.setEventListener(this)
setUpSearchResultObserver()
scrollToFirstUnreadMessageIfNeeded()
showOrHideInputIfNeeded()
setUpMessageRequestsBar()
viewModel.recipient?.let { recipient ->
if (recipient.isOpenGroupRecipient && viewModel.openGroup == null) {
Toast.makeText(this, "This thread has been deleted.", Toast.LENGTH_LONG).show()
return finish()
val weakActivity = WeakReference(this)
lifecycleScope.launch(Dispatchers.IO) {
unreadCount = mmsSmsDb.getUnreadCount(viewModel.threadId)
// Note: We are accessing the `adapter` property because we want it to be loaded on
// the background thread to avoid blocking the UI thread and potentially hanging when
// transitioning to the activity
weakActivity.get()?.adapter ?: return@launch
withContext(Dispatchers.Main) {
setUpRecyclerView()
setUpTypingObserver()
setUpRecipientObserver()
getLatestOpenGroupInfoIfNeeded()
setUpSearchResultObserver()
scrollToFirstUnreadMessageIfNeeded()
}
}
@ -631,6 +643,8 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
// region Animation & Updating
override fun onModified(recipient: Recipient) {
viewModel.updateRecipient()
runOnUiThread {
val threadRecipient = viewModel.recipient ?: return@runOnUiThread
if (threadRecipient.isContactRecipient) {

View File

@ -34,11 +34,17 @@ class ConversationViewModel(
private val _uiState = MutableStateFlow(ConversationUiState())
val uiState: StateFlow<ConversationUiState> = _uiState
private var _recipient: RetrieveOnce<Recipient> = RetrieveOnce {
repository.maybeGetRecipientForThreadId(threadId)
}
val recipient: Recipient?
get() = repository.maybeGetRecipientForThreadId(threadId)
get() = _recipient.value
private var _openGroup: RetrieveOnce<OpenGroup> = RetrieveOnce {
storage.getOpenGroup(threadId)
}
val openGroup: OpenGroup?
get() = storage.getOpenGroup(threadId)
get() = _openGroup.value
val serverCapabilities: List<String>
get() = openGroup?.let { storage.getServerCapabilities(it.server) } ?: listOf()
@ -170,6 +176,10 @@ class ConversationViewModel(
return repository.hasReceived(threadId)
}
fun updateRecipient() {
_recipient.updateTo(repository.maybeGetRecipientForThreadId(threadId))
}
@dagger.assisted.AssistedFactory
interface AssistedFactory {
fun create(threadId: Long, edKeyPair: KeyPair?): Factory
@ -195,3 +205,19 @@ data class ConversationUiState(
val uiMessages: List<UiMessage> = emptyList(),
val isMessageRequestAccepted: Boolean? = null
)
data class RetrieveOnce<T>(val retrieval: () -> T?) {
private var triedToRetrieve: Boolean = false
private var _value: T? = null
val value: T?
get() {
if (triedToRetrieve) { return _value }
triedToRetrieve = true
_value = retrieval()
return _value
}
fun updateTo(value: T?) { _value = value }
}

View File

@ -4,9 +4,11 @@ import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import androidx.work.Constraints
import androidx.work.Data
import androidx.work.ExistingPeriodicWorkPolicy
import androidx.work.NetworkType
import androidx.work.PeriodicWorkRequestBuilder
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.WorkManager
import androidx.work.Worker
import androidx.work.WorkerParameters
@ -21,19 +23,35 @@ import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPolle
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.recover
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
import java.util.concurrent.TimeUnit
class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) {
enum class Targets {
DMS, CLOSED_GROUPS, OPEN_GROUPS
}
companion object {
const val TAG = "BackgroundPollWorker"
const val INITIAL_SCHEDULE_TIME = "INITIAL_SCHEDULE_TIME"
const val REQUEST_TARGETS = "REQUEST_TARGETS"
@JvmStatic
fun schedulePeriodic(context: Context) {
fun schedulePeriodic(context: Context) = schedulePeriodic(context, targets = Targets.values())
@JvmStatic
fun schedulePeriodic(context: Context, targets: Array<Targets>) {
Log.v(TAG, "Scheduling periodic work.")
val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(15, TimeUnit.MINUTES)
val durationMinutes: Long = 15
val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(durationMinutes, TimeUnit.MINUTES)
builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
val dataBuilder = Data.Builder()
dataBuilder.putLong(INITIAL_SCHEDULE_TIME, System.currentTimeMillis() + (durationMinutes * 60 * 1000))
dataBuilder.putStringArray(REQUEST_TARGETS, targets.map { it.name }.toTypedArray())
builder.setInputData(dataBuilder.build())
val workRequest = builder.build()
WorkManager.getInstance(context).enqueueUniquePeriodicWork(
TAG,
@ -41,6 +59,20 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
workRequest
)
}
@JvmStatic
fun scheduleOnce(context: Context, targets: Array<Targets> = Targets.values()) {
Log.v(TAG, "Scheduling single run.")
val builder = OneTimeWorkRequestBuilder<BackgroundPollWorker>()
builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
val dataBuilder = Data.Builder()
dataBuilder.putStringArray(REQUEST_TARGETS, targets.map { it.name }.toTypedArray())
builder.setInputData(dataBuilder.build())
val workRequest = builder.build()
WorkManager.getInstance(context).enqueue(workRequest)
}
}
override fun doWork(): Result {
@ -49,41 +81,89 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
return Result.failure()
}
// If this is a scheduled run and it is happening before the initial scheduled time (as
// periodic background tasks run immediately when scheduled) then don't actually do anything
// because this might slow requests on initial startup or triggered by PNs
val initialScheduleTime = inputData.getLong(INITIAL_SCHEDULE_TIME, -1)
if (initialScheduleTime != -1L && System.currentTimeMillis() < (initialScheduleTime - (60 * 1000))) {
Log.v(TAG, "Skipping initial run.")
return Result.success()
}
// Retrieve the desired targets (defaulting to all if not provided or empty)
val requestTargets: List<Targets> = (inputData.getStringArray(REQUEST_TARGETS) ?: emptyArray())
.map {
try { Targets.valueOf(it) }
catch(e: Exception) { null }
}
.filterNotNull()
.ifEmpty { Targets.values().toList() }
try {
Log.v(TAG, "Performing background poll.")
Log.v(TAG, "Performing background poll for ${requestTargets.joinToString { it.name }}.")
val promises = mutableListOf<Promise<Unit, Exception>>()
// DMs
val userPublicKey = TextSecurePreferences.getLocalNumber(context)!!
val dmsPromise = SnodeAPI.getMessages(userPublicKey).bind { envelopes ->
val params = envelopes.map { (envelope, serverHash) ->
// FIXME: Using a job here seems like a bad idea...
MessageReceiveParameters(envelope.toByteArray(), serverHash, null)
var dmsPromise: Promise<Unit, Exception> = Promise.ofSuccess(Unit)
if (requestTargets.contains(Targets.DMS)) {
val userPublicKey = TextSecurePreferences.getLocalNumber(context)!!
dmsPromise = SnodeAPI.getMessages(userPublicKey).bind { envelopes ->
val params = envelopes.map { (envelope, serverHash) ->
// FIXME: Using a job here seems like a bad idea...
MessageReceiveParameters(envelope.toByteArray(), serverHash, null)
}
BatchMessageReceiveJob(params).executeAsync("background")
}
BatchMessageReceiveJob(params).executeAsync("background")
promises.add(dmsPromise)
}
promises.add(dmsPromise)
// Closed groups
val closedGroupPoller = ClosedGroupPollerV2() // Intentionally don't use shared
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.iterator().forEach { closedGroupPoller.poll(it) }
if (requestTargets.contains(Targets.CLOSED_GROUPS)) {
val closedGroupPoller = ClosedGroupPollerV2() // Intentionally don't use shared
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.iterator().forEach { closedGroupPoller.poll(it) }
}
// Open Groups
val threadDB = DatabaseComponent.get(context).lokiThreadDatabase()
val openGroups = threadDB.getAllOpenGroups()
val openGroupServers = openGroups.map { it.value.server }.toSet()
var ogPollError: Exception? = null
for (server in openGroupServers) {
val poller = OpenGroupPoller(server, null)
poller.hasStarted = true
promises.add(poller.poll())
if (requestTargets.contains(Targets.OPEN_GROUPS)) {
val threadDB = DatabaseComponent.get(context).lokiThreadDatabase()
val openGroups = threadDB.getAllOpenGroups()
val openGroupServers = openGroups.map { it.value.server }.toSet()
for (server in openGroupServers) {
val poller = OpenGroupPoller(server, null)
poller.hasStarted = true
// If one of the open group pollers fails we don't want it to cancel the DM
// poller so just hold on to the error for later
promises.add(
poller.poll().recover {
if (dmsPromise.isDone()) {
throw it
}
ogPollError = it
}
)
}
}
// Wait until all the promises are resolved
all(promises).get()
// If the Open Group pollers threw an exception then re-throw it here (now that
// the DM promise has completed)
val localOgPollException = ogPollError
if (localOgPollException != null) {
throw localOgPollException
}
return Result.success()
} catch (exception: Exception) {
Log.e(TAG, "Background poll failed due to error: ${exception.message}.", exception)

View File

@ -1,6 +1,6 @@
package org.thoughtcrime.securesms.service
import android.app.Service
import android.app.ForegroundServiceStartNotAllowedException
import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
@ -17,6 +17,8 @@ import android.telephony.PhoneStateListener.LISTEN_NONE
import android.telephony.TelephonyManager
import androidx.core.content.ContextCompat
import androidx.core.os.bundleOf
import androidx.lifecycle.LifecycleService
import androidx.lifecycle.lifecycleScope
import androidx.localbroadcastmanager.content.LocalBroadcastManager
import dagger.hilt.android.AndroidEntryPoint
import org.session.libsession.messaging.calls.CallMessageType
@ -25,6 +27,7 @@ import org.session.libsession.utilities.FutureTaskListener
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.utilities.Log
import org.thoughtcrime.securesms.calls.WebRtcCallActivity
import org.thoughtcrime.securesms.notifications.BackgroundPollWorker
import org.thoughtcrime.securesms.util.CallNotificationBuilder
import org.thoughtcrime.securesms.util.CallNotificationBuilder.Companion.TYPE_ESTABLISHED
import org.thoughtcrime.securesms.util.CallNotificationBuilder.Companion.TYPE_INCOMING_CONNECTING
@ -46,7 +49,7 @@ import javax.inject.Inject
import org.thoughtcrime.securesms.webrtc.data.State as CallState
@AndroidEntryPoint
class WebRtcCallService : Service(), CallManager.WebRtcListener {
class WebRtcCallService : LifecycleService(), CallManager.WebRtcListener {
companion object {
@ -238,7 +241,10 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
scheduledReconnect?.cancel(false)
scheduledTimeout = null
scheduledReconnect = null
stopForeground(true)
lifecycleScope.launchWhenCreated {
stopForeground(true)
}
}
private fun isSameCall(intent: Intent): Boolean {
@ -253,7 +259,9 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
private fun isIdle() = callManager.isIdle()
override fun onBind(intent: Intent?): IBinder? = null
override fun onBind(intent: Intent): IBinder? {
return super.onBind(intent)
}
override fun onHangup() {
serviceExecutor.execute {
@ -272,7 +280,8 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
if (intent == null || intent.action == null) return START_NOT_STICKY
serviceExecutor.execute {
val action = intent.action
Log.i("Loki", "Handling ${intent.action}")
val callId = ((intent.getSerializableExtra(EXTRA_CALL_ID) as? UUID)?.toString() ?: "No callId")
Log.i("Loki", "Handling ${intent.action} for call: ${callId}")
when {
action == ACTION_INCOMING_RING && isSameCall(intent) && callManager.currentConnectionState == CallState.Reconnecting -> handleNewOffer(
intent
@ -361,7 +370,9 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
insertMissedCall(recipient, false)
if (callState == CallState.Idle) {
stopForeground(true)
lifecycleScope.launchWhenCreated {
stopForeground(true)
}
}
}
@ -409,6 +420,11 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
callManager.initializeAudioForCall()
callManager.startIncomingRinger()
callManager.setAudioEnabled(true)
BackgroundPollWorker.scheduleOnce(
this,
arrayOf(BackgroundPollWorker.Targets.DMS)
)
}
}
@ -573,7 +589,9 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
private fun handleRemoteHangup(intent: Intent) {
if (callManager.callId != getCallId(intent)) {
Log.e(TAG, "Hangup for non-active call...")
stopForeground(true)
lifecycleScope.launchWhenCreated {
stopForeground(true)
}
return
}
@ -717,10 +735,16 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
}
private fun setCallInProgressNotification(type: Int, recipient: Recipient?) {
startForeground(
CallNotificationBuilder.WEBRTC_NOTIFICATION,
CallNotificationBuilder.getCallInProgressNotification(this, type, recipient)
)
try {
startForeground(
CallNotificationBuilder.WEBRTC_NOTIFICATION,
CallNotificationBuilder.getCallInProgressNotification(this, type, recipient)
)
}
catch(e: ForegroundServiceStartNotAllowedException) {
Log.e(TAG, "Failed to setCallInProgressNotification as a foreground service for type: ${type}, trying to update instead")
}
if (!CallNotificationBuilder.areNotificationsEnabled(this) && type == TYPE_INCOMING_PRE_OFFER) {
// start an intent for the fullscreen
val foregroundIntent = Intent(this, WebRtcCallActivity::class.java)
@ -769,10 +793,14 @@ class WebRtcCallService : Service(), CallManager.WebRtcListener {
callReceiver?.let { receiver ->
unregisterReceiver(receiver)
}
wiredHeadsetStateReceiver?.let { unregisterReceiver(it) }
powerButtonReceiver?.let { unregisterReceiver(it) }
networkChangedReceiver?.unregister(this)
wantsToAnswerReceiver?.let { receiver ->
LocalBroadcastManager.getInstance(this).unregisterReceiver(receiver)
}
powerButtonReceiver = null
wiredHeadsetStateReceiver = null
networkChangedReceiver = null
callReceiver = null
uncaughtExceptionHandlerManager?.unregister()

View File

@ -2,6 +2,7 @@ package org.thoughtcrime.securesms.webrtc
import android.app.NotificationManager
import android.content.Context
import android.content.Intent
import androidx.core.content.ContextCompat
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.coroutineScope
@ -32,6 +33,20 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
companion object {
private const val VERY_EXPIRED_TIME = 15 * 60 * 1000L
fun safeStartService(context: Context, intent: Intent) {
// If the foreground service crashes then it's possible for one of these intents to
// be started in the background (in which case 'startService' will throw a
// 'BackgroundServiceStartNotAllowedException' exception) so catch that case and try
// to re-start the service in the foreground
try { context.startService(intent) }
catch(e: Exception) {
try { ContextCompat.startForegroundService(context, intent) }
catch (e2: Exception) {
Log.e("Loki", "Unable to start CallMessage intent: ${e2.message}")
}
}
}
}
init {
@ -90,7 +105,7 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
private fun incomingHangup(callMessage: CallMessage) {
val callId = callMessage.callId ?: return
val hangupIntent = WebRtcCallService.remoteHangupIntent(context, callId)
context.startService(hangupIntent)
safeStartService(context, hangupIntent)
}
private fun incomingAnswer(callMessage: CallMessage) {
@ -103,7 +118,8 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
sdp = sdp,
callId = callId
)
context.startService(answerIntent)
safeStartService(context, answerIntent)
}
private fun handleIceCandidates(callMessage: CallMessage) {
@ -119,7 +135,7 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
callId = callId,
address = Address.fromSerialized(sender)
)
context.startService(iceIntent)
safeStartService(context, iceIntent)
}
private fun incomingPreOffer(callMessage: CallMessage) {
@ -132,7 +148,7 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
callId = callId,
callTime = callMessage.sentTimestamp!!
)
context.startService(incomingIntent)
safeStartService(context, incomingIntent)
}
private fun incomingCall(callMessage: CallMessage) {
@ -146,7 +162,7 @@ class CallMessageProcessor(private val context: Context, private val textSecureP
callId = callId,
callTime = callMessage.sentTimestamp!!
)
context.startService(incomingIntent)
safeStartService(context, incomingIntent)
}
private fun CallMessage.iceCandidates(): List<IceCandidate> {

View File

@ -94,19 +94,19 @@ class BatchMessageReceiveJob(
} catch (e: Exception) {
when (e) {
is MessageReceiver.Error.DuplicateMessage, MessageReceiver.Error.SelfSend -> {
Log.i(TAG, "Couldn't receive message, failed with error: ${e.message}")
Log.i(TAG, "Couldn't receive message, failed with error: ${e.message} (id: $id)")
}
is MessageReceiver.Error -> {
if (!e.isRetryable) {
Log.e(TAG, "Couldn't receive message, failed permanently", e)
Log.e(TAG, "Couldn't receive message, failed permanently (id: $id)", e)
}
else {
Log.e(TAG, "Couldn't receive message, failed", e)
Log.e(TAG, "Couldn't receive message, failed (id: $id)", e)
failures += messageParameters
}
}
else -> {
Log.e(TAG, "Couldn't receive message, failed", e)
Log.e(TAG, "Couldn't receive message, failed (id: $id)", e)
failures += messageParameters
}
}
@ -155,11 +155,11 @@ class BatchMessageReceiveJob(
else -> MessageReceiver.handle(message, proto, openGroupID)
}
} catch (e: Exception) {
Log.e(TAG, "Couldn't process message.", e)
Log.e(TAG, "Couldn't process message (id: $id)", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e(TAG, "Message failed permanently",e)
Log.e(TAG, "Message failed permanently (id: $id)", e)
} else {
Log.e(TAG, "Message failed",e)
Log.e(TAG, "Message failed (id: $id)", e)
failures += parameters
}
}
@ -196,12 +196,12 @@ class BatchMessageReceiveJob(
}
private fun handleSuccess(dispatcherName: String) {
Log.i(TAG, "Completed processing of ${messages.size} messages")
Log.i(TAG, "Completed processing of ${messages.size} messages (id: $id)")
this.delegate?.handleJobSucceeded(this, dispatcherName)
}
private fun handleFailure(dispatcherName: String) {
Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully)")
Log.i(TAG, "Handling failure of ${failures.size} messages (${messages.size - failures.size} processed successfully) (id: $id)")
this.delegate?.handleJobFailed(this, dispatcherName, Exception("One or more jobs resulted in failure"))
}

View File

@ -180,7 +180,20 @@ object MessageSender {
val hash = it["hash"] as? String
message.serverHash = hash
handleSuccessfulMessageSend(message, destination, isSyncMessage)
val shouldNotify = ((message is VisibleMessage || message is UnsendRequest || message is CallMessage) && !isSyncMessage)
val shouldNotify: Boolean = when (message) {
is VisibleMessage, is UnsendRequest -> !isSyncMessage
is CallMessage -> {
// Note: Other 'CallMessage' types are too big to send as push notifications
// so only send the 'preOffer' message as a notification
when (message.type) {
SignalServiceProtos.CallMessage.Type.PRE_OFFER -> true
else -> false
}
}
else -> false
}
/*
if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) {
shouldNotify = true

View File

@ -54,10 +54,9 @@ class ClosedGroupPollerV2 {
setUpPolling(groupPublicKey)
}
fun stop() {
val storage = MessagingModuleConfiguration.shared.storage
val allGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
allGroupPublicKeys.iterator().forEach { stopPolling(it) }
fun stopAll() {
futures.forEach { it.value.cancel(false) }
isPolling.forEach { isPolling[it.key] = false }
}
fun stopPolling(groupPublicKey: String) {

View File

@ -26,6 +26,7 @@ import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.recover
import org.session.libsignal.utilities.toHexString
import java.util.Date
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.set
private typealias Path = List<Snode>
@ -43,13 +44,27 @@ object OnionRequestAPI {
private val snodeFailureCount = mutableMapOf<Snode, Int>()
var guardSnodes = setOf<Snode>()
var _paths: AtomicReference<List<Path>?> = AtomicReference(null)
var paths: List<Path> // Not a set to ensure we consistently show the same path to the user
get() = database.getOnionRequestPaths()
get() {
val paths = _paths.get()
if (paths != null) { return paths }
// Storing this in an atomic variable as it was causing a number of background
// ANRs when this value was accessed via the main thread after tapping on
// a notification)
val result = database.getOnionRequestPaths()
_paths.set(result)
return result
}
set(newValue) {
if (newValue.isEmpty()) {
database.clearOnionRequestPaths()
_paths.set(null)
} else {
database.setOnionRequestPaths(newValue)
_paths.set(newValue)
}
}