mirror of
https://github.com/oxen-io/session-android.git
synced 2025-01-11 21:43:38 +00:00
refactor: remove old pollers from BackgroundPollWorker.kt and deleted unused pollers
This commit is contained in:
parent
0fa62d1b2a
commit
de0f9a26df
@ -7,12 +7,14 @@ import androidx.work.*
|
|||||||
import nl.komponents.kovenant.Promise
|
import nl.komponents.kovenant.Promise
|
||||||
import nl.komponents.kovenant.all
|
import nl.komponents.kovenant.all
|
||||||
import nl.komponents.kovenant.functional.map
|
import nl.komponents.kovenant.functional.map
|
||||||
|
import org.session.libsession.messaging.jobs.MessageReceiveJob
|
||||||
|
import org.session.libsession.messaging.opengroups.OpenGroup
|
||||||
|
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller
|
||||||
import org.session.libsession.utilities.TextSecurePreferences
|
import org.session.libsession.utilities.TextSecurePreferences
|
||||||
import org.session.libsignal.service.api.messages.SignalServiceEnvelope
|
|
||||||
import org.session.libsignal.service.loki.api.SnodeAPI
|
import org.session.libsignal.service.loki.api.SnodeAPI
|
||||||
import org.session.libsignal.utilities.logging.Log
|
import org.session.libsignal.utilities.logging.Log
|
||||||
|
import org.thoughtcrime.securesms.ApplicationContext
|
||||||
import org.thoughtcrime.securesms.database.DatabaseFactory
|
import org.thoughtcrime.securesms.database.DatabaseFactory
|
||||||
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) {
|
class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) {
|
||||||
@ -69,20 +71,21 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
|
|||||||
// Private chats
|
// Private chats
|
||||||
val userPublicKey = TextSecurePreferences.getLocalNumber(context)!!
|
val userPublicKey = TextSecurePreferences.getLocalNumber(context)!!
|
||||||
val privateChatsPromise = SnodeAPI.shared.getMessages(userPublicKey).map { envelopes ->
|
val privateChatsPromise = SnodeAPI.shared.getMessages(userPublicKey).map { envelopes ->
|
||||||
envelopes.forEach {
|
envelopes.map { envelope ->
|
||||||
PushContentReceiveJob(context).processEnvelope(SignalServiceEnvelope(it), false)
|
MessageReceiveJob(envelope.toByteArray(), false).executeAsync()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
promises.add(privateChatsPromise)
|
promises.addAll(privateChatsPromise.get())
|
||||||
|
|
||||||
// Closed groups
|
// Closed groups
|
||||||
// ClosedGroupPoller.configureIfNeeded(context)
|
promises.addAll(ApplicationContext.getInstance(context).closedGroupPoller.pollOnce())
|
||||||
// promises.addAll(ClosedGroupPoller.shared.pollOnce())
|
|
||||||
|
|
||||||
// Open Groups
|
// Open Groups
|
||||||
val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { it.value }
|
val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { (_,chat)->
|
||||||
|
OpenGroup(chat.channel, chat.server, chat.displayName, chat.isDeletable)
|
||||||
|
}
|
||||||
for (openGroup in openGroups) {
|
for (openGroup in openGroups) {
|
||||||
val poller = PublicChatPoller(context, openGroup)
|
val poller = OpenGroupPoller(openGroup)
|
||||||
promises.add(poller.pollForNewMessages())
|
promises.add(poller.pollForNewMessages())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,91 +0,0 @@
|
|||||||
package org.thoughtcrime.securesms.loki.api
|
|
||||||
|
|
||||||
import android.content.Context
|
|
||||||
import android.os.Handler
|
|
||||||
import nl.komponents.kovenant.Promise
|
|
||||||
import nl.komponents.kovenant.functional.bind
|
|
||||||
import nl.komponents.kovenant.functional.map
|
|
||||||
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
|
|
||||||
import org.session.libsignal.utilities.logging.Log
|
|
||||||
import org.session.libsignal.utilities.successBackground
|
|
||||||
import org.session.libsignal.service.api.messages.SignalServiceEnvelope
|
|
||||||
import org.session.libsignal.service.loki.api.SnodeAPI
|
|
||||||
import org.session.libsignal.service.loki.api.SwarmAPI
|
|
||||||
import org.session.libsignal.service.loki.utilities.getRandomElementOrNull
|
|
||||||
import org.thoughtcrime.securesms.database.DatabaseFactory
|
|
||||||
|
|
||||||
class ClosedGroupPoller private constructor(private val context: Context) {
|
|
||||||
private var isPolling = false
|
|
||||||
private val handler: Handler by lazy { Handler() }
|
|
||||||
|
|
||||||
private val task = object : Runnable {
|
|
||||||
|
|
||||||
override fun run() {
|
|
||||||
poll()
|
|
||||||
handler.postDelayed(this, pollInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// region Settings
|
|
||||||
companion object {
|
|
||||||
private val pollInterval: Long = 4 * 1000
|
|
||||||
|
|
||||||
public lateinit var shared: ClosedGroupPoller
|
|
||||||
|
|
||||||
public fun configureIfNeeded(context: Context) {
|
|
||||||
if (::shared.isInitialized) { return; }
|
|
||||||
shared = ClosedGroupPoller(context)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Error
|
|
||||||
class InsufficientSnodesException() : Exception("No snodes left to poll.")
|
|
||||||
class PollingCanceledException() : Exception("Polling canceled.")
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Public API
|
|
||||||
fun startIfNeeded() {
|
|
||||||
if (isPolling) { return }
|
|
||||||
isPolling = true
|
|
||||||
task.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
fun pollOnce(): List<Promise<Unit, Exception>> {
|
|
||||||
if (isPolling) { return listOf() }
|
|
||||||
isPolling = true
|
|
||||||
return poll()
|
|
||||||
}
|
|
||||||
|
|
||||||
fun stopIfNeeded() {
|
|
||||||
isPolling = false
|
|
||||||
handler.removeCallbacks(task)
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Private API
|
|
||||||
private fun poll(): List<Promise<Unit, Exception>> {
|
|
||||||
if (!isPolling) { return listOf() }
|
|
||||||
val publicKeys = DatabaseFactory.getLokiAPIDatabase(context).getAllClosedGroupPublicKeys()
|
|
||||||
return publicKeys.map { publicKey ->
|
|
||||||
val promise = SwarmAPI.shared.getSwarm(publicKey).bind { swarm ->
|
|
||||||
val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure
|
|
||||||
if (!isPolling) { throw PollingCanceledException() }
|
|
||||||
SnodeAPI.shared.getRawMessages(snode, publicKey).map {SnodeAPI.shared.parseRawMessagesResponse(it, snode, publicKey) }
|
|
||||||
}
|
|
||||||
promise.successBackground { messages ->
|
|
||||||
if (messages.isNotEmpty()) {
|
|
||||||
Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.")
|
|
||||||
}
|
|
||||||
messages.forEach {
|
|
||||||
PushContentReceiveJob(context).processEnvelope(SignalServiceEnvelope(it), false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
promise.fail {
|
|
||||||
Log.d("Loki", "Polling failed for closed group with public key: $publicKey due to error: $it.")
|
|
||||||
}
|
|
||||||
promise.map { Unit }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
}
|
|
@ -1,238 +0,0 @@
|
|||||||
package org.thoughtcrime.securesms.loki.api
|
|
||||||
|
|
||||||
import android.content.Context
|
|
||||||
import android.os.Handler
|
|
||||||
import org.session.libsignal.utilities.logging.Log
|
|
||||||
import nl.komponents.kovenant.Promise
|
|
||||||
import nl.komponents.kovenant.functional.bind
|
|
||||||
import nl.komponents.kovenant.functional.map
|
|
||||||
import org.thoughtcrime.securesms.ApplicationContext
|
|
||||||
import org.session.libsession.utilities.IdentityKeyUtil
|
|
||||||
import org.session.libsession.messaging.threads.Address
|
|
||||||
import org.thoughtcrime.securesms.database.DatabaseFactory
|
|
||||||
import org.thoughtcrime.securesms.jobs.PushDecryptJob
|
|
||||||
import org.thoughtcrime.securesms.jobs.RetrieveProfileAvatarJob
|
|
||||||
import org.session.libsession.messaging.threads.recipients.Recipient
|
|
||||||
import org.session.libsession.utilities.TextSecurePreferences
|
|
||||||
import org.session.libsignal.libsignal.util.guava.Optional
|
|
||||||
import org.session.libsignal.utilities.successBackground
|
|
||||||
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
|
|
||||||
import org.session.libsignal.service.api.messages.SignalServiceContent
|
|
||||||
import org.session.libsignal.service.api.messages.SignalServiceDataMessage
|
|
||||||
import org.session.libsignal.service.api.messages.SignalServiceGroup
|
|
||||||
import org.session.libsignal.service.api.push.SignalServiceAddress
|
|
||||||
import org.session.libsignal.service.loki.api.fileserver.FileServerAPI
|
|
||||||
import org.session.libsignal.service.loki.api.opengroups.PublicChat
|
|
||||||
import org.session.libsignal.service.loki.api.opengroups.PublicChatAPI
|
|
||||||
import org.session.libsignal.service.loki.api.opengroups.PublicChatMessage
|
|
||||||
import java.security.MessageDigest
|
|
||||||
import java.util.*
|
|
||||||
|
|
||||||
class PublicChatPoller(private val context: Context, private val group: PublicChat) {
|
|
||||||
private val handler by lazy { Handler() }
|
|
||||||
private var hasStarted = false
|
|
||||||
private var isPollOngoing = false
|
|
||||||
public var isCaughtUp = false
|
|
||||||
|
|
||||||
// region Convenience
|
|
||||||
private val userHexEncodedPublicKey = TextSecurePreferences.getLocalNumber(context)!!
|
|
||||||
private var displayNameUpdatees = setOf<String>()
|
|
||||||
|
|
||||||
private val api: PublicChatAPI
|
|
||||||
get() = {
|
|
||||||
val userPrivateKey = IdentityKeyUtil.getIdentityKeyPair(context).privateKey.serialize()
|
|
||||||
val lokiAPIDatabase = DatabaseFactory.getLokiAPIDatabase(context)
|
|
||||||
val lokiUserDatabase = DatabaseFactory.getLokiUserDatabase(context)
|
|
||||||
val openGroupDatabase = DatabaseFactory.getGroupDatabase(context)
|
|
||||||
PublicChatAPI(userHexEncodedPublicKey, userPrivateKey, lokiAPIDatabase, lokiUserDatabase, openGroupDatabase)
|
|
||||||
}()
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Tasks
|
|
||||||
private val pollForNewMessagesTask = object : Runnable {
|
|
||||||
|
|
||||||
override fun run() {
|
|
||||||
pollForNewMessages()
|
|
||||||
handler.postDelayed(this, pollForNewMessagesInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private val pollForDeletedMessagesTask = object : Runnable {
|
|
||||||
|
|
||||||
override fun run() {
|
|
||||||
pollForDeletedMessages()
|
|
||||||
handler.postDelayed(this, pollForDeletedMessagesInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private val pollForModeratorsTask = object : Runnable {
|
|
||||||
|
|
||||||
override fun run() {
|
|
||||||
pollForModerators()
|
|
||||||
handler.postDelayed(this, pollForModeratorsInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private val pollForDisplayNamesTask = object : Runnable {
|
|
||||||
|
|
||||||
override fun run() {
|
|
||||||
pollForDisplayNames()
|
|
||||||
handler.postDelayed(this, pollForDisplayNamesInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Settings
|
|
||||||
companion object {
|
|
||||||
private val pollForNewMessagesInterval: Long = 4 * 1000
|
|
||||||
private val pollForDeletedMessagesInterval: Long = 60 * 1000
|
|
||||||
private val pollForModeratorsInterval: Long = 10 * 60 * 1000
|
|
||||||
private val pollForDisplayNamesInterval: Long = 60 * 1000
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Lifecycle
|
|
||||||
fun startIfNeeded() {
|
|
||||||
if (hasStarted) return
|
|
||||||
pollForNewMessagesTask.run()
|
|
||||||
pollForDeletedMessagesTask.run()
|
|
||||||
pollForModeratorsTask.run()
|
|
||||||
pollForDisplayNamesTask.run()
|
|
||||||
hasStarted = true
|
|
||||||
}
|
|
||||||
|
|
||||||
fun stop() {
|
|
||||||
handler.removeCallbacks(pollForNewMessagesTask)
|
|
||||||
handler.removeCallbacks(pollForDeletedMessagesTask)
|
|
||||||
handler.removeCallbacks(pollForModeratorsTask)
|
|
||||||
handler.removeCallbacks(pollForDisplayNamesTask)
|
|
||||||
hasStarted = false
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region Polling
|
|
||||||
private fun getDataMessage(message: PublicChatMessage): SignalServiceDataMessage {
|
|
||||||
val id = group.id.toByteArray()
|
|
||||||
val serviceGroup = SignalServiceGroup(SignalServiceGroup.Type.UPDATE, id, SignalServiceGroup.GroupType.PUBLIC_CHAT, null, null, null, null)
|
|
||||||
val quote = if (message.quote != null) {
|
|
||||||
SignalServiceDataMessage.Quote(message.quote!!.quotedMessageTimestamp, SignalServiceAddress(message.quote!!.quoteePublicKey), message.quote!!.quotedMessageBody, listOf())
|
|
||||||
} else {
|
|
||||||
null
|
|
||||||
}
|
|
||||||
val attachments = message.attachments.mapNotNull { attachment ->
|
|
||||||
if (attachment.kind != PublicChatMessage.Attachment.Kind.Attachment) { return@mapNotNull null }
|
|
||||||
SignalServiceAttachmentPointer(
|
|
||||||
attachment.serverID,
|
|
||||||
attachment.contentType,
|
|
||||||
ByteArray(0),
|
|
||||||
Optional.of(attachment.size),
|
|
||||||
Optional.absent(),
|
|
||||||
attachment.width, attachment.height,
|
|
||||||
Optional.absent(),
|
|
||||||
Optional.of(attachment.fileName),
|
|
||||||
false,
|
|
||||||
Optional.fromNullable(attachment.caption),
|
|
||||||
attachment.url)
|
|
||||||
}
|
|
||||||
val linkPreview = message.attachments.firstOrNull { it.kind == PublicChatMessage.Attachment.Kind.LinkPreview }
|
|
||||||
val signalLinkPreviews = mutableListOf<SignalServiceDataMessage.Preview>()
|
|
||||||
if (linkPreview != null) {
|
|
||||||
val attachment = SignalServiceAttachmentPointer(
|
|
||||||
linkPreview.serverID,
|
|
||||||
linkPreview.contentType,
|
|
||||||
ByteArray(0),
|
|
||||||
Optional.of(linkPreview.size),
|
|
||||||
Optional.absent(),
|
|
||||||
linkPreview.width, linkPreview.height,
|
|
||||||
Optional.absent(),
|
|
||||||
Optional.of(linkPreview.fileName),
|
|
||||||
false,
|
|
||||||
Optional.fromNullable(linkPreview.caption),
|
|
||||||
linkPreview.url)
|
|
||||||
signalLinkPreviews.add(SignalServiceDataMessage.Preview(linkPreview.linkPreviewURL!!, linkPreview.linkPreviewTitle!!, Optional.of(attachment)))
|
|
||||||
}
|
|
||||||
val body = if (message.body == message.timestamp.toString()) "" else message.body // Workaround for the fact that the back-end doesn't accept messages without a body
|
|
||||||
val syncTarget = if (message.senderPublicKey == userHexEncodedPublicKey) group.id else null
|
|
||||||
return SignalServiceDataMessage(message.timestamp, serviceGroup, attachments, body, 0, false, null, quote, null, signalLinkPreviews, null, syncTarget)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun pollForNewMessages(): Promise<Unit, Exception> {
|
|
||||||
if (isPollOngoing) { return Promise.of(Unit) }
|
|
||||||
isPollOngoing = true
|
|
||||||
val userPrivateKey = IdentityKeyUtil.getIdentityKeyPair(context).privateKey.serialize()
|
|
||||||
val apiDB = DatabaseFactory.getLokiAPIDatabase(context)
|
|
||||||
FileServerAPI.configure(userHexEncodedPublicKey, userPrivateKey, apiDB)
|
|
||||||
// Kovenant propagates a context to chained promises, so LokiPublicChatAPI.sharedContext should be used for all of the below
|
|
||||||
val promise = api.getMessages(group.channel, group.server).bind(PublicChatAPI.sharedContext) { messages ->
|
|
||||||
Promise.of(messages)
|
|
||||||
}
|
|
||||||
promise.successBackground { messages ->
|
|
||||||
// Process messages in the background
|
|
||||||
messages.forEach { message ->
|
|
||||||
// If the sender of the current message is not a slave device, set the display name in the database
|
|
||||||
val senderDisplayName = "${message.displayName} (...${message.senderPublicKey.takeLast(8)})"
|
|
||||||
DatabaseFactory.getLokiUserDatabase(context).setServerDisplayName(group.id, message.senderPublicKey, senderDisplayName)
|
|
||||||
val senderHexEncodedPublicKey = message.senderPublicKey
|
|
||||||
val serviceDataMessage = getDataMessage(message)
|
|
||||||
val serviceContent = SignalServiceContent(serviceDataMessage, senderHexEncodedPublicKey, SignalServiceAddress.DEFAULT_DEVICE_ID, message.serverTimestamp, false)
|
|
||||||
if (serviceDataMessage.quote.isPresent || (serviceDataMessage.attachments.isPresent && serviceDataMessage.attachments.get().size > 0) || serviceDataMessage.previews.isPresent) {
|
|
||||||
PushDecryptJob(context).handleMediaMessage(serviceContent, serviceDataMessage, Optional.absent(), Optional.of(message.serverID))
|
|
||||||
} else {
|
|
||||||
PushDecryptJob(context).handleTextMessage(serviceContent, serviceDataMessage, Optional.absent(), Optional.of(message.serverID))
|
|
||||||
}
|
|
||||||
// Update profile picture if needed
|
|
||||||
val senderAsRecipient = Recipient.from(context, Address.fromSerialized(senderHexEncodedPublicKey), false)
|
|
||||||
if (message.profilePicture != null && message.profilePicture!!.url.isNotEmpty()) {
|
|
||||||
val profileKey = message.profilePicture!!.profileKey
|
|
||||||
val url = message.profilePicture!!.url
|
|
||||||
if (senderAsRecipient.profileKey == null || !MessageDigest.isEqual(senderAsRecipient.profileKey, profileKey)) {
|
|
||||||
val database = DatabaseFactory.getRecipientDatabase(context)
|
|
||||||
database.setProfileKey(senderAsRecipient, profileKey)
|
|
||||||
ApplicationContext.getInstance(context).jobManager.add(RetrieveProfileAvatarJob(senderAsRecipient, url))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
isCaughtUp = true
|
|
||||||
isPollOngoing = false
|
|
||||||
}
|
|
||||||
promise.fail {
|
|
||||||
Log.d("Loki", "Failed to get messages for group chat with ID: ${group.channel} on server: ${group.server}.")
|
|
||||||
isPollOngoing = false
|
|
||||||
}
|
|
||||||
return promise.map { Unit }
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun pollForDisplayNames() {
|
|
||||||
if (displayNameUpdatees.isEmpty()) { return }
|
|
||||||
val hexEncodedPublicKeys = displayNameUpdatees
|
|
||||||
displayNameUpdatees = setOf()
|
|
||||||
api.getDisplayNames(hexEncodedPublicKeys, group.server).successBackground { mapping ->
|
|
||||||
for (pair in mapping.entries) {
|
|
||||||
val senderDisplayName = "${pair.value} (...${pair.key.takeLast(8)})"
|
|
||||||
DatabaseFactory.getLokiUserDatabase(context).setServerDisplayName(group.id, pair.key, senderDisplayName)
|
|
||||||
}
|
|
||||||
}.fail {
|
|
||||||
displayNameUpdatees = displayNameUpdatees.union(hexEncodedPublicKeys)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun pollForDeletedMessages() {
|
|
||||||
api.getDeletedMessageServerIDs(group.channel, group.server).success { deletedMessageServerIDs ->
|
|
||||||
val lokiMessageDatabase = DatabaseFactory.getLokiMessageDatabase(context)
|
|
||||||
val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { lokiMessageDatabase.getMessageID(it) }
|
|
||||||
val smsMessageDatabase = DatabaseFactory.getSmsDatabase(context)
|
|
||||||
val mmsMessageDatabase = DatabaseFactory.getMmsDatabase(context)
|
|
||||||
deletedMessageIDs.forEach {
|
|
||||||
smsMessageDatabase.deleteMessage(it)
|
|
||||||
mmsMessageDatabase.delete(it)
|
|
||||||
}
|
|
||||||
}.fail {
|
|
||||||
Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${group.channel} on server: ${group.server}.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun pollForModerators() {
|
|
||||||
api.getModerators(group.channel, group.server)
|
|
||||||
}
|
|
||||||
// endregion
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user