Clean up background poll worker

This commit is contained in:
Niels Andriesse
2021-05-13 09:24:13 +10:00
parent c5e0589751
commit 26601dbcb2

View File

@@ -8,7 +8,6 @@ import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.map import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.open_groups.OpenGroup
import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.open_groups.OpenGroupV2
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller
@@ -17,7 +16,6 @@ import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.logging.Log import org.session.libsignal.utilities.logging.Log
import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.database.DatabaseFactory
import java.io.IOException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) {
@@ -25,45 +23,23 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
companion object { companion object {
const val TAG = "BackgroundPollWorker" const val TAG = "BackgroundPollWorker"
private const val RETRY_ATTEMPTS = 3
@JvmStatic
fun scheduleInstant(context: Context) {
val workRequest = OneTimeWorkRequestBuilder<BackgroundPollWorker>()
.setConstraints(Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
)
.build()
WorkManager
.getInstance(context)
.enqueue(workRequest)
}
@JvmStatic @JvmStatic
fun schedulePeriodic(context: Context) { fun schedulePeriodic(context: Context) {
Log.v(TAG, "Scheduling periodic work.") Log.v(TAG, "Scheduling periodic work.")
val workRequest = PeriodicWorkRequestBuilder<BackgroundPollWorker>(15, TimeUnit.MINUTES) val builder = PeriodicWorkRequestBuilder<BackgroundPollWorker>(5, TimeUnit.MINUTES)
.setConstraints(Constraints.Builder() builder.setConstraints(Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
.setRequiredNetworkType(NetworkType.CONNECTED) val workRequest = builder.build()
.build() WorkManager.getInstance(context).enqueueUniquePeriodicWork(
) TAG,
.build() ExistingPeriodicWorkPolicy.REPLACE,
workRequest
WorkManager )
.getInstance(context)
.enqueueUniquePeriodicWork(
TAG,
ExistingPeriodicWorkPolicy.KEEP,
workRequest
)
} }
} }
override fun doWork(): Result { override fun doWork(): Result {
if (TextSecurePreferences.getLocalNumber(context) == null) { if (TextSecurePreferences.getLocalNumber(context) == null) {
Log.v(TAG, "Background poll is canceled due to the Session user is not set up yet.") Log.v(TAG, "User not registered yet.")
return Result.failure() return Result.failure()
} }
@@ -71,44 +47,41 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
Log.v(TAG, "Performing background poll.") Log.v(TAG, "Performing background poll.")
val promises = mutableListOf<Promise<Unit, Exception>>() val promises = mutableListOf<Promise<Unit, Exception>>()
// Private chats // DMs
val userPublicKey = TextSecurePreferences.getLocalNumber(context)!! val userPublicKey = TextSecurePreferences.getLocalNumber(context)!!
val privateChatsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes -> val dmsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes ->
envelopes.map { envelope -> envelopes.map { envelope ->
// FIXME: Using a job here seems like a bad idea... // FIXME: Using a job here seems like a bad idea...
MessageReceiveJob(envelope.toByteArray(), false).executeAsync() MessageReceiveJob(envelope.toByteArray(), false).executeAsync()
} }
} }
promises.addAll(privateChatsPromise.get()) promises.addAll(dmsPromise.get())
// Closed groups // Closed groups
promises.addAll(ClosedGroupPoller().pollOnce()) promises.addAll(ClosedGroupPoller().pollOnce())
// Open Groups // Open Groups
val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { (_,chat)-> val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().values
OpenGroup(chat.channel, chat.server, chat.displayName, chat.isDeletable)
}
for (openGroup in openGroups) { for (openGroup in openGroups) {
val poller = OpenGroupPoller(openGroup) val poller = OpenGroupPoller(openGroup)
promises.add(poller.pollForNewMessages()) promises.add(poller.pollForNewMessages())
} }
val openGroupsV2 = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) val v2OpenGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server)
openGroupsV2.values.map { groups -> v2OpenGroups.values.map { groups ->
OpenGroupV2Poller(groups) OpenGroupV2Poller(groups)
}.forEach { poller -> }.forEach { poller ->
promises.add(poller.compactPoll(true).map{ /*Unit*/ }) promises.add(poller.compactPoll(true).map { })
} }
// Wait till all the promises get resolved // Wait until all the promises are resolved
all(promises).get() all(promises).get()
return Result.success() return Result.success()
} catch (exception: Exception) { } catch (exception: Exception) {
Log.v(TAG, "Background poll failed due to error: ${exception.message}.", exception) Log.e(TAG, "Background poll failed due to error: ${exception.message}.", exception)
return Result.retry()
return if (runAttemptCount < RETRY_ATTEMPTS) Result.retry() else Result.failure()
} }
} }
@@ -117,8 +90,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
override fun onReceive(context: Context, intent: Intent) { override fun onReceive(context: Context, intent: Intent) {
if (intent.action == Intent.ACTION_BOOT_COMPLETED) { if (intent.action == Intent.ACTION_BOOT_COMPLETED) {
Log.v(TAG, "Boot broadcast caught.") Log.v(TAG, "Boot broadcast caught.")
BackgroundPollWorker.scheduleInstant(context) schedulePeriodic(context)
BackgroundPollWorker.schedulePeriodic(context)
} }
} }
} }