feat: adding config sync functionality, refactoring jobs to execute in suspend context to do some nice coroutine execution

This commit is contained in:
0x330a 2023-02-08 17:09:52 +11:00
parent c639d57471
commit c0bcc37d2e
No known key found for this signature in database
GPG Key ID: 267811D6E6A2698C
16 changed files with 90 additions and 23 deletions

View File

@ -126,6 +126,8 @@ class ConfigFactory(private val context: Context,
} }
override fun notifyUpdates(forConfigObject: ConfigBase) { override fun notifyUpdates(forConfigObject: ConfigBase) {
if (!forConfigObject.needsDump()) return
when (forConfigObject) { when (forConfigObject) {
is UserProfile -> updateUser(forConfigObject) is UserProfile -> updateUser(forConfigObject)
is Contacts -> updateContacts(forConfigObject) is Contacts -> updateContacts(forConfigObject)

View File

@ -6,10 +6,10 @@ import dagger.Provides
import dagger.hilt.InstallIn import dagger.hilt.InstallIn
import dagger.hilt.android.qualifiers.ApplicationContext import dagger.hilt.android.qualifiers.ApplicationContext
import dagger.hilt.components.SingletonComponent import dagger.hilt.components.SingletonComponent
import org.session.libsession.database.StorageProtocol
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
import org.thoughtcrime.securesms.crypto.KeyPairUtilities import org.thoughtcrime.securesms.crypto.KeyPairUtilities
import org.thoughtcrime.securesms.database.ConfigDatabase import org.thoughtcrime.securesms.database.ConfigDatabase
import org.thoughtcrime.securesms.database.Storage
import javax.inject.Singleton import javax.inject.Singleton
@Module @Module
@ -23,7 +23,7 @@ object SessionUtilModule {
@Provides @Provides
@Singleton @Singleton
fun provideConfigFactory(@ApplicationContext context: Context, configDatabase: ConfigDatabase, storage: StorageProtocol): ConfigFactory = fun provideConfigFactory(@ApplicationContext context: Context, configDatabase: ConfigDatabase, storage: Storage): ConfigFactory =
ConfigFactory(context, configDatabase, storage) { ConfigFactory(context, configDatabase, storage) {
val localUserPublicKey = TextSecurePreferences.getLocalNumber(context) val localUserPublicKey = TextSecurePreferences.getLocalNumber(context)
val secretKey = maybeUserEdSecretKey(context) val secretKey = maybeUserEdSecretKey(context)

View File

@ -42,7 +42,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id" private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id"
} }
override fun execute() { override suspend fun execute() {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val threadID = storage.getThreadIdForMms(databaseMessageID) val threadID = storage.getThreadIdForMms(databaseMessageID)

View File

@ -16,7 +16,11 @@ import org.session.libsession.utilities.DecodedAudio
import org.session.libsession.utilities.InputStreamMediaDataSource import org.session.libsession.utilities.InputStreamMediaDataSource
import org.session.libsession.utilities.UploadResult import org.session.libsession.utilities.UploadResult
import org.session.libsignal.messages.SignalServiceAttachmentStream import org.session.libsignal.messages.SignalServiceAttachmentStream
import org.session.libsignal.streams.* import org.session.libsignal.streams.AttachmentCipherOutputStream
import org.session.libsignal.streams.AttachmentCipherOutputStreamFactory
import org.session.libsignal.streams.DigestingRequestBody
import org.session.libsignal.streams.PaddingInputStream
import org.session.libsignal.streams.PlaintextOutputStreamFactory
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.PushAttachmentData import org.session.libsignal.utilities.PushAttachmentData
import org.session.libsignal.utilities.Util import org.session.libsignal.utilities.Util
@ -45,7 +49,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id" private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id"
} }
override fun execute() { override suspend fun execute() {
try { try {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider

View File

@ -3,9 +3,7 @@ package org.session.libsession.messaging.jobs
import okhttp3.HttpUrl import okhttp3.HttpUrl
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.open_groups.OpenGroup import org.session.libsession.messaging.open_groups.OpenGroup
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.OpenGroupUrlParser import org.session.libsession.utilities.OpenGroupUrlParser
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
@ -29,7 +27,7 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
return "$server.$room" return "$server.$room"
} }
override fun execute() { override suspend fun execute() {
try { try {
val openGroup = OpenGroupUrlParser.parseUrl(joinUrl) val openGroup = OpenGroupUrlParser.parseUrl(joinUrl)
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage

View File

@ -15,7 +15,11 @@ import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.ParsedMessage import org.session.libsession.messaging.messages.visible.ParsedMessage
import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.open_groups.OpenGroupApi import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.messaging.sending_receiving.* import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.sending_receiving.handleOpenGroupReactions
import org.session.libsession.messaging.sending_receiving.handleUnsendRequest
import org.session.libsession.messaging.sending_receiving.handleVisibleMessage
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.messaging.utilities.SessionId import org.session.libsession.messaging.utilities.SessionId
import org.session.libsession.messaging.utilities.SodiumUtilities import org.session.libsession.messaging.utilities.SodiumUtilities
@ -66,7 +70,7 @@ class BatchMessageReceiveJob(
return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID) return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID)
} }
override fun execute() { override suspend fun execute() {
executeAsync().get() executeAsync().get()
} }

View File

@ -0,0 +1,61 @@
package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.utilities.Data
// only contact (self) and closed group destinations will be supported
data class ConfigurationSyncJob(val destination: Destination): Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override suspend fun execute() {
TODO("Not yet implemented")
}
override fun serialize(): Data {
val (type, address) = when (destination) {
is Destination.Contact -> CONTACT_TYPE to destination.publicKey
is Destination.ClosedGroup -> GROUP_TYPE to destination.groupPublicKey
else -> return Data.EMPTY
}
return Data.Builder()
.putInt(DESTINATION_TYPE_KEY, type)
.putString(DESTINATION_ADDRESS_KEY, address)
.build()
}
override fun getFactoryKey(): String = KEY
companion object {
const val TAG = "ConfigSyncJob"
const val KEY = "ConfigSyncJob"
// Keys used for DB storage
const val DESTINATION_ADDRESS_KEY = "destinationAddress"
const val DESTINATION_TYPE_KEY = "destinationType"
// type mappings
const val CONTACT_TYPE = 1
const val GROUP_TYPE = 2
}
class Factory: Job.Factory<ConfigurationSyncJob> {
override fun create(data: Data): ConfigurationSyncJob? {
if (!data.hasInt(DESTINATION_TYPE_KEY) || !data.hasString(DESTINATION_ADDRESS_KEY)) return null
val address = data.getString(DESTINATION_ADDRESS_KEY)
val destination = when (data.getInt(DESTINATION_TYPE_KEY)) {
CONTACT_TYPE -> Destination.Contact(address)
GROUP_TYPE -> Destination.ClosedGroup(address)
else -> return null
}
return ConfigurationSyncJob(destination)
}
}
}

View File

@ -12,7 +12,7 @@ class GroupAvatarDownloadJob(val room: String, val server: String) : Job {
override var failureCount: Int = 0 override var failureCount: Int = 0
override val maxFailureCount: Int = 10 override val maxFailureCount: Int = 10
override fun execute() { override suspend fun execute() {
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage
val imageId = storage.getOpenGroup(room, server)?.imageId ?: return val imageId = storage.getOpenGroup(room, server)?.imageId ?: return
try { try {

View File

@ -17,7 +17,7 @@ interface Job {
internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes
} }
fun execute() suspend fun execute()
fun serialize(): Data fun serialize(): Data

View File

@ -94,7 +94,7 @@ class JobQueue : JobDelegate {
} }
} }
private fun Job.process(dispatcherName: String) { private suspend fun Job.process(dispatcherName: String) {
Log.d(dispatcherName,"processJob: ${javaClass.simpleName}") Log.d(dispatcherName,"processJob: ${javaClass.simpleName}")
delegate = this@JobQueue delegate = this@JobQueue
execute() execute()

View File

@ -25,7 +25,7 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
private val OPEN_GROUP_ID_KEY = "open_group_id" private val OPEN_GROUP_ID_KEY = "open_group_id"
} }
override fun execute() { override suspend fun execute() {
executeAsync().get() executeAsync().get()
} }

View File

@ -10,7 +10,6 @@ import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsignal.utilities.HTTP import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Log import org.session.libsignal.utilities.Log
@ -33,7 +32,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
private val DESTINATION_KEY = "destination" private val DESTINATION_KEY = "destination"
} }
override fun execute() { override suspend fun execute() {
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val message = message as? VisibleMessage val message = message as? VisibleMessage
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage

View File

@ -8,15 +8,13 @@ import okhttp3.MediaType
import okhttp3.Request import okhttp3.Request
import okhttp3.RequestBody import okhttp3.RequestBody
import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.OnionRequestAPI import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.Version import org.session.libsession.snode.Version
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.JsonUtil import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.retryIfNeeded import org.session.libsignal.utilities.retryIfNeeded
class NotifyPNServerJob(val message: SnodeMessage) : Job { class NotifyPNServerJob(val message: SnodeMessage) : Job {
@ -32,7 +30,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
private val MESSAGE_KEY = "message" private val MESSAGE_KEY = "message"
} }
override fun execute() { override suspend fun execute() {
val server = PushNotificationAPI.server val server = PushNotificationAPI.server
val parameters = mapOf( "data" to message.data, "send_to" to message.recipient ) val parameters = mapOf( "data" to message.data, "send_to" to message.recipient )
val url = "${server}/notify" val url = "${server}/notify"

View File

@ -19,7 +19,7 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
override var failureCount: Int = 0 override var failureCount: Int = 0
override val maxFailureCount: Int = 1 override val maxFailureCount: Int = 1
override fun execute() { override suspend fun execute() {
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val numberToDelete = messageServerIds.size val numberToDelete = messageServerIds.size
Log.d(TAG, "Deleting $numberToDelete messages") Log.d(TAG, "Deleting $numberToDelete messages")

View File

@ -16,6 +16,7 @@ class SessionJobManagerFactories {
GroupAvatarDownloadJob.KEY to GroupAvatarDownloadJob.Factory(), GroupAvatarDownloadJob.KEY to GroupAvatarDownloadJob.Factory(),
BackgroundGroupAddJob.KEY to BackgroundGroupAddJob.Factory(), BackgroundGroupAddJob.KEY to BackgroundGroupAddJob.Factory(),
OpenGroupDeleteJob.KEY to OpenGroupDeleteJob.Factory(), OpenGroupDeleteJob.KEY to OpenGroupDeleteJob.Factory(),
ConfigurationSyncJob.KEY to ConfigurationSyncJob.Factory()
) )
} }
} }

View File

@ -20,7 +20,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
const val THREAD_LENGTH_TRIGGER_SIZE = 2000 const val THREAD_LENGTH_TRIGGER_SIZE = 2000
} }
override fun execute() { override suspend fun execute() {
val context = MessagingModuleConfiguration.shared.context val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context) val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val storage = MessagingModuleConfiguration.shared.storage val storage = MessagingModuleConfiguration.shared.storage