mirror of
https://github.com/oxen-io/session-android.git
synced 2025-04-16 08:31:25 +00:00
try to use jobs to mark if all open groups are caught up
This commit is contained in:
parent
12ae8d4051
commit
d52787a661
@ -207,7 +207,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
|
|||||||
}
|
}
|
||||||
startPollingIfNeeded();
|
startPollingIfNeeded();
|
||||||
|
|
||||||
OpenGroupManager.INSTANCE.setAllCaughtUp(false);
|
|
||||||
OpenGroupManager.INSTANCE.startPolling();
|
OpenGroupManager.INSTANCE.startPolling();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,10 +4,7 @@ import android.content.Context
|
|||||||
import android.net.Uri
|
import android.net.Uri
|
||||||
import okhttp3.HttpUrl
|
import okhttp3.HttpUrl
|
||||||
import org.session.libsession.database.StorageProtocol
|
import org.session.libsession.database.StorageProtocol
|
||||||
import org.session.libsession.messaging.jobs.AttachmentUploadJob
|
import org.session.libsession.messaging.jobs.*
|
||||||
import org.session.libsession.messaging.jobs.Job
|
|
||||||
import org.session.libsession.messaging.jobs.JobQueue
|
|
||||||
import org.session.libsession.messaging.jobs.MessageSendJob
|
|
||||||
import org.session.libsession.messaging.messages.control.ConfigurationMessage
|
import org.session.libsession.messaging.messages.control.ConfigurationMessage
|
||||||
import org.session.libsession.messaging.messages.signal.*
|
import org.session.libsession.messaging.messages.signal.*
|
||||||
import org.session.libsession.messaging.messages.signal.IncomingTextMessage
|
import org.session.libsession.messaging.messages.signal.IncomingTextMessage
|
||||||
@ -210,6 +207,10 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
|
|||||||
return DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID)
|
return DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun getMessageReceivedJob(messageReceiveJobID: String): MessageReceiveJob? {
|
||||||
|
return DatabaseFactory.getSessionJobDatabase(context).getMessageReceiveJob(messageReceiveJobID)
|
||||||
|
}
|
||||||
|
|
||||||
override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) {
|
override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) {
|
||||||
val job = DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) ?: return
|
val job = DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) ?: return
|
||||||
JobQueue.shared.add(job)
|
JobQueue.shared.add(job)
|
||||||
|
@ -19,7 +19,22 @@ object OpenGroupManager {
|
|||||||
private var pollers = mutableMapOf<String, OpenGroupPollerV2>() // One for each server
|
private var pollers = mutableMapOf<String, OpenGroupPollerV2>() // One for each server
|
||||||
private var isPolling = false
|
private var isPolling = false
|
||||||
|
|
||||||
var isAllCaughtUp = false
|
val isAllCaughtUp: Boolean
|
||||||
|
get() {
|
||||||
|
pollers.values.forEach { poller ->
|
||||||
|
val jobID = poller.secondLastJob?.id
|
||||||
|
jobID?.let {
|
||||||
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
|
if (storage.getMessageReceivedJob(jobID) == null) {
|
||||||
|
// If the second last job is done, it means we are now handling the last job
|
||||||
|
poller.isCaughtUp = true
|
||||||
|
poller.secondLastJob = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!poller.isCaughtUp) { return false }
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
fun startPolling() {
|
fun startPolling() {
|
||||||
if (isPolling) { return }
|
if (isPolling) { return }
|
||||||
|
@ -71,6 +71,13 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun getMessageReceiveJob(messageReceiveJobID: String): MessageReceiveJob? {
|
||||||
|
val database = databaseHelper.readableDatabase
|
||||||
|
return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf( messageReceiveJobID, MessageReceiveJob.KEY )) { cursor ->
|
||||||
|
jobFromCursor(cursor) as MessageReceiveJob?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun cancelPendingMessageSendJobs(threadID: Long) {
|
fun cancelPendingMessageSendJobs(threadID: Long) {
|
||||||
val database = databaseHelper.writableDatabase
|
val database = databaseHelper.writableDatabase
|
||||||
val attachmentUploadJobKeys = mutableListOf<String>()
|
val attachmentUploadJobKeys = mutableListOf<String>()
|
||||||
|
@ -287,9 +287,6 @@ public class DefaultMessageNotifier implements MessageNotifier {
|
|||||||
} finally {
|
} finally {
|
||||||
if (telcoCursor != null) telcoCursor.close();
|
if (telcoCursor != null) telcoCursor.close();
|
||||||
if (pushCursor != null) pushCursor.close();
|
if (pushCursor != null) pushCursor.close();
|
||||||
if (!OpenGroupManager.INSTANCE.isAllCaughtUp()) {
|
|
||||||
OpenGroupManager.INSTANCE.setAllCaughtUp(true);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ import org.session.libsession.messaging.sending_receiving.notifications.MessageN
|
|||||||
import org.session.libsession.messaging.sending_receiving.pollers.Poller;
|
import org.session.libsession.messaging.sending_receiving.pollers.Poller;
|
||||||
import org.session.libsession.utilities.recipients.Recipient;
|
import org.session.libsession.utilities.recipients.Recipient;
|
||||||
import org.session.libsession.utilities.Debouncer;
|
import org.session.libsession.utilities.Debouncer;
|
||||||
|
import org.session.libsignal.utilities.Log;
|
||||||
import org.session.libsignal.utilities.ThreadUtils;
|
import org.session.libsignal.utilities.ThreadUtils;
|
||||||
import org.thoughtcrime.securesms.ApplicationContext;
|
import org.thoughtcrime.securesms.ApplicationContext;
|
||||||
import org.thoughtcrime.securesms.loki.api.OpenGroupManager;
|
import org.thoughtcrime.securesms.loki.api.OpenGroupManager;
|
||||||
@ -23,7 +24,7 @@ public class OptimizedMessageNotifier implements MessageNotifier {
|
|||||||
@MainThread
|
@MainThread
|
||||||
public OptimizedMessageNotifier(@NonNull MessageNotifier wrapped) {
|
public OptimizedMessageNotifier(@NonNull MessageNotifier wrapped) {
|
||||||
this.wrapped = wrapped;
|
this.wrapped = wrapped;
|
||||||
this.debouncer = new Debouncer(TimeUnit.SECONDS.toMillis(1));
|
this.debouncer = new Debouncer(TimeUnit.SECONDS.toMillis(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -66,6 +67,8 @@ public class OptimizedMessageNotifier implements MessageNotifier {
|
|||||||
}
|
}
|
||||||
|
|
||||||
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();
|
isCaughtUp = isCaughtUp && OpenGroupManager.INSTANCE.isAllCaughtUp();
|
||||||
|
|
||||||
|
Log.d("Ryan", "Is caught up? " + isCaughtUp);
|
||||||
|
|
||||||
if (isCaughtUp) {
|
if (isCaughtUp) {
|
||||||
performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId));
|
performOnBackgroundThreadIfNeeded(() -> wrapped.updateNotification(context, threadId));
|
||||||
|
@ -5,6 +5,7 @@ import android.content.Context
|
|||||||
import android.net.Uri
|
import android.net.Uri
|
||||||
import org.session.libsession.messaging.jobs.AttachmentUploadJob
|
import org.session.libsession.messaging.jobs.AttachmentUploadJob
|
||||||
import org.session.libsession.messaging.jobs.Job
|
import org.session.libsession.messaging.jobs.Job
|
||||||
|
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||||
import org.session.libsession.messaging.jobs.MessageSendJob
|
import org.session.libsession.messaging.jobs.MessageSendJob
|
||||||
import org.session.libsession.messaging.messages.control.ConfigurationMessage
|
import org.session.libsession.messaging.messages.control.ConfigurationMessage
|
||||||
import org.session.libsession.messaging.messages.visible.Attachment
|
import org.session.libsession.messaging.messages.visible.Attachment
|
||||||
@ -49,6 +50,7 @@ interface StorageProtocol {
|
|||||||
fun getAllPendingJobs(type: String): Map<String,Job?>
|
fun getAllPendingJobs(type: String): Map<String,Job?>
|
||||||
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
|
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
|
||||||
fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
|
fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
|
||||||
|
fun getMessageReceivedJob(messageReceiveJobID: String): MessageReceiveJob?
|
||||||
fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
|
fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
|
||||||
fun isJobCanceled(job: Job): Boolean
|
fun isJobCanceled(job: Job): Boolean
|
||||||
|
|
||||||
|
@ -18,6 +18,8 @@ import java.util.concurrent.TimeUnit
|
|||||||
|
|
||||||
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
|
class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) {
|
||||||
var hasStarted = false
|
var hasStarted = false
|
||||||
|
var isCaughtUp = false
|
||||||
|
var secondLastJob: MessageReceiveJob? = null
|
||||||
private var future: ScheduledFuture<*>? = null
|
private var future: ScheduledFuture<*>? = null
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
@ -43,6 +45,9 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
val openGroupID = "$server.$room"
|
val openGroupID = "$server.$room"
|
||||||
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
|
handleNewMessages(openGroupID, response.messages, isBackgroundPoll)
|
||||||
handleDeletedMessages(openGroupID, response.deletions)
|
handleDeletedMessages(openGroupID, response.deletions)
|
||||||
|
if (secondLastJob == null && !isCaughtUp) {
|
||||||
|
isCaughtUp = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}.always {
|
}.always {
|
||||||
executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS)
|
executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS)
|
||||||
@ -51,6 +56,7 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
|
|
||||||
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
private fun handleNewMessages(openGroupID: String, messages: List<OpenGroupMessageV2>, isBackgroundPoll: Boolean) {
|
||||||
if (!hasStarted) { return }
|
if (!hasStarted) { return }
|
||||||
|
var latestJob: MessageReceiveJob? = null
|
||||||
messages.sortedBy { it.serverID!! }.forEach { message ->
|
messages.sortedBy { it.serverID!! }.forEach { message ->
|
||||||
try {
|
try {
|
||||||
val senderPublicKey = message.sender!!
|
val senderPublicKey = message.sender!!
|
||||||
@ -66,11 +72,16 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
|
|||||||
job.executeAsync()
|
job.executeAsync()
|
||||||
} else {
|
} else {
|
||||||
JobQueue.shared.add(job)
|
JobQueue.shared.add(job)
|
||||||
|
if (!isCaughtUp) {
|
||||||
|
secondLastJob = latestJob
|
||||||
|
}
|
||||||
|
latestJob = job
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
Log.e("Loki", "Exception parsing message", e)
|
Log.e("Loki", "Exception parsing message", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Log.d("Ryan", "Finish a round of polling in thread $openGroupID")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
|
private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List<Long>) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user