Performance improvements and bug fixes (#869)

* refactor: fail on testSnode instead of recursively using up snode list. add call timeout on http client

* refactor: refactoring batch message receives and pollers

* refactor: reduce thread utils pool count to a 2 thread fixed pool. Do a check against pubkey instead of room names for oxenHostedOpenGroup

* refactor: caching lib with potential loader fixes and no-cache for giphy

* refactor: remove store and instead use ConcurrentHashMap with a backing update coroutine

* refactor: queue trim thread jobs instead of add every message processed

* fix: wrapping auth token and initial sync for open groups in a threadutils queued runnable, getting initial sync times down

* fix: fixing the user contacts cache in ConversationAdapter.kt

* refactor: improve polling and initial sync, move group joins from config messages into a background job fetching image.

* refactor: improving the job queuing for open groups, replacing placeholder avatar generation with a custom glide loader and archiving initial sync of open groups

* feat: add OpenGroupDeleteJob.kt

* feat: add open group delete job to process deletions after batch adding

* feat: add vacuum and fix job queue re-adding jobs forever, only try to set message hash values in DB if they have changed

* refactor: remove redundant inflation for profile image views throughout app

* refactor(wip): reducing layout inflation and starting to refactor the open group deletion issues taking a long time

* refactor(wip): refactoring group deletion to not iterate through and delete messages individually

* refactor(wip): refactoring group deletion to not iterate through and delete messages individually

* fix: group deletion optimisation

* build: bump build number

* build: bump build number and fix batch message receive retry logic

* fix: clear out open group deletes

* fix: update visible ConversationAdapter.kt binding for initial contact fetching and better traces for debugging background jobs

* fix: add in check for / force sync latest encryption key pair from linked devices if we already have that closed group

* Rename .java to .kt

* refactor: change MmsDatabase to kotlin to make list operations easier

* fix: nullable type

* fix: compilation issues and constants in .kt instead of .java

* fix: bug fix expiration timer on closed group recipient

* feat: use the job queue properly across executors

* feat: start on open group dispatcher-specific logic, probably a queue factory based on openGroupId if that is the same across new message and deletion jobs to ensure consistent entry and removal

* refactor: removing redundant code and fixing jobqueue per opengroup

* fix: allow attachments in note to self

* fix: make the minWidth in quote view bind max of text / title and body, wrapped ?

* fix: fixing up layouts and code view layouts

* fix: remove TODO, remove timestamp binding

* feat: fix view logic, avatars and padding, downloading attachments lazily (on bind), fixing potential crash, add WindowDebouncer.kt

* fix: NPE on viewModel recipient from removed thread while tearing down the Recipient observer in ConversationActivityV2.kt

* refactor: replace conversation notification debouncer handler with handlerthread, same as conversation list debouncer

* refactor: UI for groups and poller improvements

* fix: revert some changes in poller

* feat: add header back in for message requests

* refactor: remove Trace calls, add more conditions to the HomeDiffUtil for updating more efficiently

* feat: try update the home adapter if we get a profile picture modified event

* feat: bump build numbers

* fix: try to start with list in homeViewModel if we don't have already, render quotes to be width of attachment slide view instead of fixed

* fix: set channel to be conflated instead of no buffer

* fix: set unreads based off last local user message vs incrementing unreads to be all amount

* feat: add profile update flag, update build number

* fix: link preview thumbnails download on bind

* fix: centercrop placeholder in glide request

* feat: recycle the contact selection list and profile image in unbind

* fix: try to prevent user KP crash at weird times

* fix: remove additional log, improve attachment download success rate, fix share logs dialog issue
This commit is contained in:
Harris
2022-06-08 17:12:34 +10:00
committed by GitHub
parent db92034a8a
commit 6ddefb7a2e
133 changed files with 3775 additions and 2357 deletions

View File

@@ -6,11 +6,11 @@ import android.net.Uri;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.session.libsession.messaging.MessagingModuleConfiguration;
import org.session.libsession.database.StorageProtocol;
import org.session.libsession.messaging.MessagingModuleConfiguration;
import org.session.libsession.utilities.Address;
import org.session.libsession.utilities.GroupRecord;
import org.session.libsession.utilities.Conversions;
import org.session.libsession.utilities.GroupRecord;
import org.session.libsignal.utilities.guava.Optional;
import java.io.ByteArrayInputStream;
@@ -31,7 +31,7 @@ public class GroupRecordContactPhoto implements ContactPhoto {
@Override
public InputStream openInputStream(Context context) throws IOException {
StorageProtocol groupDatabase = MessagingModuleConfiguration.shared.getStorage();
StorageProtocol groupDatabase = MessagingModuleConfiguration.getShared().getStorage();
Optional<GroupRecord> groupRecord = Optional.of(groupDatabase.getGroup(address.toGroupString()));
if (groupRecord.isPresent() && groupRecord.get().getAvatar() != null) {

View File

@@ -0,0 +1,13 @@
package org.session.libsession.avatars
import com.bumptech.glide.load.Key
import java.security.MessageDigest
class PlaceholderAvatarPhoto(val hashString: String,
val displayName: String): Key {
override fun updateDiskCacheKey(messageDigest: MessageDigest) {
messageDigest.update(hashString.encodeToByteArray())
messageDigest.update(displayName.encodeToByteArray())
}
}

View File

@@ -1,6 +1,11 @@
package org.session.libsession.database
import org.session.libsession.messaging.sending_receiving.attachments.*
import org.session.libsession.messaging.sending_receiving.attachments.Attachment
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentId
import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState
import org.session.libsession.messaging.sending_receiving.attachments.DatabaseAttachment
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachmentPointer
import org.session.libsession.messaging.sending_receiving.attachments.SessionServiceAttachmentStream
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.UploadResult
import org.session.libsession.utilities.recipients.Recipient
@@ -11,6 +16,9 @@ import java.io.InputStream
interface MessageDataProvider {
fun getMessageID(serverID: Long): Long?
/**
* @return pair of sms or mms table-specific ID and whether it is in SMS table
*/
fun getMessageID(serverId: Long, threadId: Long): Pair<Long, Boolean>?
fun deleteMessage(messageID: Long, isSms: Boolean)
fun updateMessageAsDeleted(timestamp: Long, author: String)

View File

@@ -58,6 +58,8 @@ interface StorageProtocol {
fun getAllV2OpenGroups(): Map<Long, OpenGroupV2>
fun getV2OpenGroup(threadId: Long): OpenGroupV2?
fun addOpenGroup(urlAsString: String)
fun onOpenGroupAdded(urlAsString: String)
fun hasBackgroundGroupAddJob(groupJoinUrl: String): Boolean
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long, threadID: Long, isSms: Boolean)
// Open Group Public Keys
@@ -155,7 +157,10 @@ interface StorageProtocol {
/**
* Returns the ID of the `TSIncomingMessage` that was constructed.
*/
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>): Long?
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>, runIncrement: Boolean, runThreadUpdate: Boolean): Long?
fun markConversationAsRead(threadId: Long, updateLastSeen: Boolean)
fun incrementUnread(threadId: Long, amount: Int)
fun updateThread(threadId: Long, unarchive: Boolean)
fun insertDataExtractionNotificationMessage(senderPublicKey: String, message: DataExtractionNotificationInfoMessage, sentTimestamp: Long)
fun insertMessageRequestResponse(response: MessageRequestResponse)
fun setRecipientApproved(recipient: Recipient, approved: Boolean)

View File

@@ -13,13 +13,17 @@ class MessagingModuleConfiguration(
) {
companion object {
lateinit var shared: MessagingModuleConfiguration
@JvmStatic
val shared: MessagingModuleConfiguration
get() = context.getSystemService(MESSAGING_MODULE_SERVICE) as MessagingModuleConfiguration
fun configure(context: Context, storage: StorageProtocol,
messageDataProvider: MessageDataProvider, keyPairProvider: () -> KeyPair?
) {
if (Companion::shared.isInitialized) { return }
shared = MessagingModuleConfiguration(context, storage, messageDataProvider, keyPairProvider)
const val MESSAGING_MODULE_SERVICE: String = "MessagingModuleConfiguration_MESSAGING_MODULE_SERVICE"
private lateinit var context: Context
@JvmStatic
fun configure(context: Context) {
this.context = context
}
}
}

View File

@@ -17,7 +17,6 @@ import org.session.libsignal.utilities.Log
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
import java.lang.NullPointerException
class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) : Job {
override var delegate: JobDelegate? = null
@@ -33,7 +32,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
}
// Settings
override val maxFailureCount: Int = 100
override val maxFailureCount: Int = 2
companion object {
val KEY: String = "AttachmentDownloadJob"
@@ -54,12 +53,23 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
|| exception == Error.NoSender
|| (exception is OnionRequestAPI.HTTPRequestFailedAtDestinationException && exception.statusCode == 400)) {
attachment?.let { id ->
Log.d("AttachmentDownloadJob", "Setting attachment state = failed, have attachment")
messageDataProvider.setAttachmentState(AttachmentState.FAILED, id, databaseMessageID)
} ?: run {
Log.d("AttachmentDownloadJob", "Setting attachment state = failed, don't have attachment")
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
}
this.handlePermanentFailure(exception)
} else {
if (failureCount + 1 >= maxFailureCount) {
attachment?.let { id ->
Log.d("AttachmentDownloadJob", "Setting attachment state = failed from max failure count, have attachment")
messageDataProvider.setAttachmentState(AttachmentState.FAILED, id, databaseMessageID)
} ?: run {
Log.d("AttachmentDownloadJob", "Setting attachment state = failed from max failure count, don't have attachment")
messageDataProvider.setAttachmentState(AttachmentState.FAILED, AttachmentId(attachmentID,0), databaseMessageID)
}
}
this.handleFailure(exception)
}
}
@@ -98,16 +108,20 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
tempFile = createTempFile()
val openGroupV2 = storage.getV2OpenGroup(threadID)
if (openGroupV2 == null) {
Log.d("AttachmentDownloadJob", "downloading normal attachment")
DownloadUtilities.downloadFile(tempFile, attachment.url)
} else {
Log.d("AttachmentDownloadJob", "downloading open group attachment")
val url = HttpUrl.parse(attachment.url)!!
val fileID = url.pathSegments().last()
OpenGroupAPIV2.download(fileID.toLong(), openGroupV2.room, openGroupV2.server).get().let {
tempFile.writeBytes(it)
}
}
Log.d("AttachmentDownloadJob", "getting input stream")
val inputStream = getInputStream(tempFile, attachment)
Log.d("AttachmentDownloadJob", "inserting attachment")
messageDataProvider.insertAttachment(databaseMessageID, attachment.attachmentId, inputStream)
if (attachment.contentType.startsWith("audio/")) {
// process the duration
@@ -124,9 +138,12 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
Log.e("Loki", "Couldn't process audio attachment", e)
}
}
Log.d("AttachmentDownloadJob", "deleting tempfile")
tempFile.delete()
Log.d("AttachmentDownloadJob", "succeeding job")
handleSuccess()
} catch (e: Exception) {
Log.e("AttachmentDownloadJob", "Error processing attachment download", e)
tempFile?.delete()
return handleFailure(e,null)
}
@@ -135,8 +152,10 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
private fun getInputStream(tempFile: File, attachment: DatabaseAttachment): InputStream {
// Assume we're retrieving an attachment for an open group server if the digest is not set
return if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) {
Log.d("AttachmentDownloadJob", "getting input stream with no attachment digest")
FileInputStream(tempFile)
} else {
Log.d("AttachmentDownloadJob", "getting input stream with attachment digest")
AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest)
}
}

View File

@@ -0,0 +1,79 @@
package org.session.libsession.messaging.jobs
import okhttp3.HttpUrl
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupV2
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.utilities.Log
class BackgroundGroupAddJob(val joinUrl: String): Job {
companion object {
const val KEY = "BackgroundGroupAddJob"
private const val JOIN_URL = "joinUri"
}
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
val openGroupId: String? get() {
val url = HttpUrl.parse(joinUrl) ?: return null
val server = OpenGroupV2.getServer(joinUrl)?.toString()?.removeSuffix("/") ?: return null
val room = url.pathSegments().firstOrNull() ?: return null
return "$server.$room"
}
override fun execute() {
try {
val storage = MessagingModuleConfiguration.shared.storage
val allV2OpenGroups = storage.getAllV2OpenGroups().map { it.value.joinURL }
if (allV2OpenGroups.contains(joinUrl)) {
Log.e("OpenGroupDispatcher", "Failed to add group because",DuplicateGroupException())
delegate?.handleJobFailed(this, DuplicateGroupException())
return
}
// get image
val url = HttpUrl.parse(joinUrl) ?: throw Exception("Group joinUrl isn't valid")
val server = OpenGroupV2.getServer(joinUrl)
val serverString = server.toString().removeSuffix("/")
val publicKey = url.queryParameter("public_key") ?: throw Exception("Group public key isn't valid")
val room = url.pathSegments().firstOrNull() ?: throw Exception("Group room isn't valid")
storage.setOpenGroupPublicKey(serverString,publicKey)
val bytes = OpenGroupAPIV2.downloadOpenGroupProfilePicture(url.pathSegments().firstOrNull()!!, serverString).get()
val groupId = GroupUtil.getEncodedOpenGroupID("$server.$room".toByteArray())
// get info and auth token
storage.addOpenGroup(joinUrl)
storage.updateProfilePicture(groupId, bytes)
storage.updateTimestampUpdated(groupId, System.currentTimeMillis())
storage.onOpenGroupAdded(joinUrl)
} catch (e: Exception) {
Log.e("OpenGroupDispatcher", "Failed to add group because",e)
delegate?.handleJobFailed(this, e)
return
}
Log.d("Loki", "Group added successfully")
delegate?.handleJobSucceeded(this)
}
override fun serialize(): Data = Data.Builder()
.putString(JOIN_URL, joinUrl)
.build()
override fun getFactoryKey(): String = KEY
class DuplicateGroupException: Exception("Current open groups already contains this group")
class Factory : Job.Factory<BackgroundGroupAddJob> {
override fun create(data: Data): BackgroundGroupAddJob {
return BackgroundGroupAddJob(
data.getString(JOIN_URL)
)
}
}
}

View File

@@ -1,11 +1,23 @@
package org.session.libsession.messaging.jobs
import com.google.protobuf.ByteString
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.task
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.ParsedMessage
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.sending_receiving.handleVisibleMessage
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.SSKEnvironment
import org.session.libsignal.protos.UtilProtos
import org.session.libsignal.utilities.Log
@@ -23,7 +35,7 @@ class BatchMessageReceiveJob(
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 10
override val maxFailureCount: Int = 1 // handled in JobQueue onJobFailed
// Failure Exceptions must be retryable if they're a MessageReceiver.Error
val failures = mutableListOf<MessageReceiveParameters>()
@@ -31,7 +43,7 @@ class BatchMessageReceiveJob(
const val TAG = "BatchMessageReceiveJob"
const val KEY = "BatchMessageReceiveJob"
const val BATCH_DEFAULT_NUMBER = 50
const val BATCH_DEFAULT_NUMBER = 512
// Keys used for database storage
private val NUM_MESSAGES_KEY = "numMessages"
@@ -41,18 +53,39 @@ class BatchMessageReceiveJob(
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
private fun getThreadId(message: Message, storage: StorageProtocol): Long {
val senderOrSync = when (message) {
is VisibleMessage -> message.syncTarget ?: message.sender!!
is ExpirationTimerUpdate -> message.syncTarget ?: message.sender!!
else -> message.sender!!
}
return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID)
}
override fun execute() {
executeAsync().get()
}
fun executeAsync(): Promise<Unit, Exception> {
return task {
messages.iterator().forEach { messageParameters ->
val threadMap = mutableMapOf<Long, MutableList<ParsedMessage>>()
val storage = MessagingModuleConfiguration.shared.storage
val context = MessagingModuleConfiguration.shared.context
val localUserPublicKey = storage.getUserPublicKey()
// parse and collect IDs
messages.forEach { messageParameters ->
val (data, serverHash, openGroupMessageServerID) = messageParameters
try {
val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID)
message.serverHash = serverHash
MessageReceiver.handle(message, proto, this.openGroupID)
val threadID = getThreadId(message, storage)
val parsedParams = ParsedMessage(messageParameters, message, proto)
if (!threadMap.containsKey(threadID)) {
threadMap[threadID] = mutableListOf(parsedParams)
} else {
threadMap[threadID]!! += parsedParams
}
} catch (e: Exception) {
Log.e(TAG, "Couldn't receive message.", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
@@ -63,6 +96,53 @@ class BatchMessageReceiveJob(
}
}
}
// iterate over threads and persist them (persistence is the longest constant in the batch process operation)
runBlocking(Dispatchers.IO) {
val deferredThreadMap = threadMap.entries.map { (threadId, messages) ->
async {
val messageIds = mutableListOf<Pair<Long, Boolean>>()
messages.forEach { (parameters, message, proto) ->
try {
if (message is VisibleMessage) {
val messageId = MessageReceiver.handleVisibleMessage(message, proto, openGroupID,
runIncrement = false,
runThreadUpdate = false,
runProfileUpdate = true
)
if (messageId != null) {
messageIds += messageId to (message.sender == localUserPublicKey)
}
} else {
MessageReceiver.handle(message, proto, openGroupID)
}
} catch (e: Exception) {
Log.e(TAG, "Couldn't process message.", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e(TAG, "Message failed permanently",e)
} else {
Log.e(TAG, "Message failed",e)
failures += parameters
}
}
}
// increment unreads, notify, and update thread
val unreadFromMine = messageIds.indexOfLast { (_,fromMe) -> fromMe }
var trueUnreadCount = messageIds.size
if (unreadFromMine >= 0) {
trueUnreadCount -= (unreadFromMine + 1)
storage.markConversationAsRead(threadId, false)
}
SSKEnvironment.shared.notificationManager.updateNotification(context, threadId)
storage.incrementUnread(threadId, trueUnreadCount)
storage.updateThread(threadId, true)
}
}
// await all thread processing
deferredThreadMap.awaitAll()
}
if (failures.isEmpty()) {
handleSuccess()
} else {

View File

@@ -2,7 +2,7 @@ package org.session.libsession.messaging.jobs
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.Channel
@@ -25,46 +25,128 @@ class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val rxMediaDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
private val openGroupDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = GlobalScope + SupervisorJob()
private val scope = CoroutineScope(Dispatchers.Default) + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
private val pendingJobIds = mutableSetOf<String>()
private val pendingTrimThreadIds = mutableSetOf<Long>()
private val openGroupChannels = mutableMapOf<String, Channel<Job>>()
val timer = Timer()
private fun CoroutineScope.processWithDispatcher(
private fun CoroutineScope.processWithOpenGroupDispatcher(
channel: Channel<Job>,
dispatcher: CoroutineDispatcher
dispatcher: CoroutineDispatcher,
name: String
) = launch(dispatcher) {
for (job in channel) {
if (!isActive) break
job.delegate = this@JobQueue
job.execute()
val openGroupId = when (job) {
is BatchMessageReceiveJob -> job.openGroupID
is OpenGroupDeleteJob -> job.openGroupId
is TrimThreadJob -> job.openGroupId
is BackgroundGroupAddJob -> job.openGroupId
is GroupAvatarDownloadJob -> "${job.server}.${job.room}"
else -> null
}
if (openGroupId.isNullOrEmpty()) {
Log.e("OpenGroupDispatcher", "Open Group ID was null on ${job.javaClass.simpleName}")
handleJobFailedPermanently(job, NullPointerException("Open Group ID was null"))
} else {
val groupChannel = if (!openGroupChannels.containsKey(openGroupId)) {
Log.d("OpenGroupDispatcher", "Creating $openGroupId channel")
val newGroupChannel = Channel<Job>(UNLIMITED)
launch(dispatcher) {
for (groupJob in newGroupChannel) {
if (!isActive) break
groupJob.process(name)
}
}
openGroupChannels[openGroupId] = newGroupChannel
newGroupChannel
} else {
Log.d("OpenGroupDispatcher", "Re-using channel")
openGroupChannels[openGroupId]!!
}
Log.d("OpenGroupDispatcher", "Sending to channel $groupChannel")
groupChannel.send(job)
}
}
}
private fun CoroutineScope.processWithDispatcher(
channel: Channel<Job>,
dispatcher: CoroutineDispatcher,
name: String,
asynchronous: Boolean = true
) = launch(dispatcher) {
for (job in channel) {
if (!isActive) break
if (asynchronous) {
launch(dispatcher) {
job.process(name)
}
} else {
job.process(name)
}
}
}
private fun Job.process(dispatcherName: String) {
Log.d(dispatcherName,"processJob: ${javaClass.simpleName}")
delegate = this@JobQueue
execute()
}
init {
// Process jobs
scope.launch {
val rxQueue = Channel<Job>(capacity = 4096)
val txQueue = Channel<Job>(capacity = 4096)
val rxQueue = Channel<Job>(capacity = UNLIMITED)
val txQueue = Channel<Job>(capacity = UNLIMITED)
val mediaQueue = Channel<Job>(capacity = UNLIMITED)
val openGroupQueue = Channel<Job>(capacity = UNLIMITED)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
val txJob = processWithDispatcher(txQueue, txDispatcher)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher, "rx", asynchronous = false)
val txJob = processWithDispatcher(txQueue, txDispatcher, "tx")
val mediaJob = processWithDispatcher(mediaQueue, rxMediaDispatcher, "media")
val openGroupJob = processWithOpenGroupDispatcher(openGroupQueue, openGroupDispatcher, "openGroup")
while (isActive) {
for (job in queue) {
when (job) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> {
txQueue.send(job)
}
is MessageReceiveJob, is TrimThreadJob, is BatchMessageReceiveJob,
is AttachmentDownloadJob, is GroupAvatarDownloadJob -> {
if (queue.isEmpty && pendingTrimThreadIds.isNotEmpty()) {
// process trim thread jobs
val pendingThreads = pendingTrimThreadIds.toList()
pendingTrimThreadIds.clear()
for (thread in pendingThreads) {
Log.d("Loki", "Trimming thread $thread")
queue.trySend(TrimThreadJob(thread, null))
}
}
when (val job = queue.receive()) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> {
txQueue.send(job)
}
is AttachmentDownloadJob -> {
mediaQueue.send(job)
}
is GroupAvatarDownloadJob,
is BackgroundGroupAddJob,
is OpenGroupDeleteJob -> {
openGroupQueue.send(job)
}
is MessageReceiveJob, is TrimThreadJob,
is BatchMessageReceiveJob -> {
if ((job is BatchMessageReceiveJob && !job.openGroupID.isNullOrEmpty())
|| (job is TrimThreadJob && !job.openGroupId.isNullOrEmpty())) {
openGroupQueue.send(job)
} else {
rxQueue.send(job)
}
else -> {
throw IllegalStateException("Unexpected job type.")
}
}
else -> {
throw IllegalStateException("Unexpected job type.")
}
}
}
@@ -72,7 +154,8 @@ class JobQueue : JobDelegate {
// The job has been cancelled
receiveJob.cancel()
txJob.cancel()
mediaJob.cancel()
openGroupJob.cancel()
}
}
@@ -82,6 +165,10 @@ class JobQueue : JobDelegate {
val shared: JobQueue by lazy { JobQueue() }
}
fun queueThreadForTrim(threadId: Long) {
pendingTrimThreadIds += threadId
}
fun add(job: Job) {
addWithoutExecuting(job)
queue.trySend(job) // offer always called on unlimited capacity
@@ -141,7 +228,9 @@ class JobQueue : JobDelegate {
MessageSendJob.KEY,
NotifyPNServerJob.KEY,
BatchMessageReceiveJob.KEY,
GroupAvatarDownloadJob.KEY
GroupAvatarDownloadJob.KEY,
BackgroundGroupAddJob.KEY,
OpenGroupDeleteJob.KEY,
)
allJobTypes.forEach { type ->
resumePendingJobs(type)
@@ -165,13 +254,20 @@ class JobQueue : JobDelegate {
Log.i("Loki", "Message send job waiting for attachment upload to finish.")
return
}
// Batch message receive job, re-queue non-permanently failed jobs
if (job is BatchMessageReceiveJob) {
val replacementParameters = job.failures
if (job is BatchMessageReceiveJob && job.failureCount <= 0) {
val replacementParameters = job.failures.toList()
if (replacementParameters.isNotEmpty()) {
val newJob = BatchMessageReceiveJob(replacementParameters, job.openGroupID)
newJob.failureCount = job.failureCount + 1
add(newJob)
}
}
// Regular job failure
job.failureCount += 1
if (job.failureCount >= job.maxFailureCount) {
handleJobFailedPermanently(job, error)
} else {

View File

@@ -0,0 +1,51 @@
package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.Data
import org.session.libsignal.utilities.Log
class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val threadId: Long, val openGroupId: String): Job {
companion object {
private const val TAG = "OpenGroupDeleteJob"
const val KEY = "OpenGroupDeleteJob"
private const val MESSAGE_IDS = "messageIds"
private const val THREAD_ID = "threadId"
private const val OPEN_GROUP_ID = "openGroupId"
}
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override fun execute() {
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val numberToDelete = messageServerIds.size
Log.d(TAG, "Deleting $numberToDelete messages")
messageServerIds.forEach { serverId ->
val (messageId, isSms) = dataProvider.getMessageID(serverId, threadId) ?: return@forEach
dataProvider.deleteMessage(messageId, isSms)
}
Log.d(TAG, "Deleted $numberToDelete messages successfully")
delegate?.handleJobSucceeded(this)
}
override fun serialize(): Data = Data.Builder()
.putLongArray(MESSAGE_IDS, messageServerIds)
.putLong(THREAD_ID, threadId)
.putString(OPEN_GROUP_ID, openGroupId)
.build()
override fun getFactoryKey(): String = KEY
class Factory: Job.Factory<OpenGroupDeleteJob> {
override fun create(data: Data): OpenGroupDeleteJob {
val messageServerIds = data.getLongArray(MESSAGE_IDS)
val threadId = data.getLong(THREAD_ID)
val openGroupId = data.getString(OPEN_GROUP_ID)
return OpenGroupDeleteJob(messageServerIds, threadId, openGroupId)
}
}
}

View File

@@ -13,7 +13,9 @@ class SessionJobManagerFactories {
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory(),
TrimThreadJob.KEY to TrimThreadJob.Factory(),
BatchMessageReceiveJob.KEY to BatchMessageReceiveJob.Factory(),
GroupAvatarDownloadJob.KEY to GroupAvatarDownloadJob.Factory()
GroupAvatarDownloadJob.KEY to GroupAvatarDownloadJob.Factory(),
BackgroundGroupAddJob.KEY to BackgroundGroupAddJob.Factory(),
OpenGroupDeleteJob.KEY to OpenGroupDeleteJob.Factory(),
)
}
}

View File

@@ -4,7 +4,7 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.TextSecurePreferences
class TrimThreadJob(val threadId: Long) : Job {
class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
@@ -14,6 +14,7 @@ class TrimThreadJob(val threadId: Long) : Job {
companion object {
const val KEY: String = "TrimThreadJob"
const val THREAD_ID = "thread_id"
const val OPEN_GROUP_ID = "open_group"
}
override fun execute() {
@@ -27,9 +28,12 @@ class TrimThreadJob(val threadId: Long) : Job {
}
override fun serialize(): Data {
return Data.Builder()
val builder = Data.Builder()
.putLong(THREAD_ID, threadId)
.build()
if (!openGroupId.isNullOrEmpty()) {
builder.putString(OPEN_GROUP_ID, openGroupId)
}
return builder.build()
}
override fun getFactoryKey(): String = "TrimThreadJob"
@@ -37,7 +41,7 @@ class TrimThreadJob(val threadId: Long) : Job {
class Factory : Job.Factory<TrimThreadJob> {
override fun create(data: Data): TrimThreadJob {
return TrimThreadJob(data.getLong(THREAD_ID))
return TrimThreadJob(data.getLong(THREAD_ID), data.getStringOrDefault(OPEN_GROUP_ID, null))
}
}

View File

@@ -0,0 +1,11 @@
package org.session.libsession.messaging.messages.visible
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.Message
import org.session.libsignal.protos.SignalServiceProtos
data class ParsedMessage(
val parameters: MessageReceiveParameters,
val message: Message,
val proto: SignalServiceProtos.Content
)

View File

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.PropertyNamingStrategy
import com.fasterxml.jackson.databind.annotation.JsonNaming
import com.fasterxml.jackson.databind.type.TypeFactory
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
@@ -18,11 +17,22 @@ import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPolle
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.utilities.AESGCM
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.*
import org.session.libsignal.utilities.Base64.*
import org.session.libsignal.utilities.HTTP.Verb.*
import org.session.libsignal.utilities.Base64.decode
import org.session.libsignal.utilities.Base64.encodeBytes
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.HTTP.Verb.DELETE
import org.session.libsignal.utilities.HTTP.Verb.GET
import org.session.libsignal.utilities.HTTP.Verb.POST
import org.session.libsignal.utilities.HTTP.Verb.PUT
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.removing05PrefixIfNeeded
import org.session.libsignal.utilities.toHexString
import org.whispersystems.curve25519.Curve25519
import java.util.*
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
object OpenGroupAPIV2 {
private val moderators: HashMap<String, Set<String>> = hashMapOf() // Server URL to (channel ID to set of moderator IDs)
@@ -38,7 +48,7 @@ object OpenGroupAPIV2 {
now - lastOpenDate
}
private const val defaultServerPublicKey = "a03c383cf63c3c4efe67acc52112a6dd734b3a946b9545f488aaa93da7991238"
const val defaultServerPublicKey = "a03c383cf63c3c4efe67acc52112a6dd734b3a946b9545f488aaa93da7991238"
const val defaultServer = "http://116.203.70.33"
sealed class Error(message: String) : Exception(message) {
@@ -168,6 +178,9 @@ object OpenGroupAPIV2 {
.success { authToken ->
storage.setAuthToken(room, server, authToken)
}
.fail { exception ->
Log.e("Loki", "Failed to get auth token", exception)
}
}
}

View File

@@ -1,8 +1,9 @@
package org.session.libsession.messaging.open_groups
import okhttp3.HttpUrl
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import java.util.*
import java.util.Locale
data class OpenGroupV2(
val server: String,
@@ -37,6 +38,15 @@ data class OpenGroupV2(
}
}
fun getServer(urlAsString: String): HttpUrl? {
val url = HttpUrl.parse(urlAsString) ?: return null
val builder = HttpUrl.Builder().scheme(url.scheme()).host(url.host())
if (url.port() != 80 || url.port() != 443) {
// Non-standard port; add to server
builder.port(url.port())
}
return builder.build()
}
}
fun toJson(): Map<String,String> = mapOf(

View File

@@ -2,10 +2,19 @@ package org.session.libsession.messaging.sending_receiving
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.*
import org.session.libsession.messaging.messages.control.CallMessage
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.DataExtractionNotification
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.MessageRequestResponse
import org.session.libsession.messaging.messages.control.ReadReceipt
import org.session.libsession.messaging.messages.control.TypingIndicator
import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsignal.crypto.PushTransportDetails
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Log
object MessageReceiver {
@@ -38,7 +47,9 @@ object MessageReceiver {
// Parse the envelope
val envelope = SignalServiceProtos.Envelope.parseFrom(data)
// Decrypt the contents
val ciphertext = envelope.content ?: throw Error.NoData
val ciphertext = envelope.content ?: run {
throw Error.NoData
}
var plaintext: ByteArray? = null
var sender: String? = null
var groupPublicKey: String? = null
@@ -59,7 +70,9 @@ object MessageReceiver {
throw Error.InvalidGroupPublicKey
}
val encryptionKeyPairs = MessagingModuleConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey)
if (encryptionKeyPairs.isEmpty()) { throw Error.NoGroupKeyPair }
if (encryptionKeyPairs.isEmpty()) {
throw Error.NoGroupKeyPair
}
// Loop through all known group key pairs in reverse order (i.e. try the latest key pair first (which'll more than
// likely be the one we want) but try older ones in case that didn't work)
var encryptionKeyPair = encryptionKeyPairs.removeLast()
@@ -73,6 +86,7 @@ object MessageReceiver {
encryptionKeyPair = encryptionKeyPairs.removeLast()
decrypt()
} else {
Log.e("Loki", "Failed to decrypt group message", e)
throw e
}
}
@@ -80,11 +94,15 @@ object MessageReceiver {
groupPublicKey = envelope.source
decrypt()
}
else -> throw Error.UnknownEnvelopeType
else -> {
throw Error.UnknownEnvelopeType
}
}
}
// Don't process the envelope any further if the sender is blocked
if (isBlocked(sender!!)) throw Error.SenderBlocked
if (isBlocked(sender!!)) {
throw Error.SenderBlocked
}
// Parse the proto
val proto = SignalServiceProtos.Content.parseFrom(PushTransportDetails.getStrippedPaddingMessageBody(plaintext))
// Parse the message
@@ -97,11 +115,17 @@ object MessageReceiver {
UnsendRequest.fromProto(proto) ?:
MessageRequestResponse.fromProto(proto) ?:
CallMessage.fromProto(proto) ?:
VisibleMessage.fromProto(proto) ?: throw Error.UnknownMessage
VisibleMessage.fromProto(proto) ?: run {
throw Error.UnknownMessage
}
// Ignore self send if needed
if (!message.isSelfSendValid && sender == userPublicKey) throw Error.SelfSend
if (!message.isSelfSendValid && sender == userPublicKey) {
throw Error.SelfSend
}
// Guard against control messages in open groups
if (isOpenGroupMessage && message !is VisibleMessage) throw Error.InvalidMessage
if (isOpenGroupMessage && message !is VisibleMessage) {
throw Error.InvalidMessage
}
// Finish parsing
message.sender = sender
message.recipient = userPublicKey
@@ -112,7 +136,9 @@ object MessageReceiver {
// Validate
var isValid = message.isValid()
if (message is VisibleMessage && !isValid && proto.dataMessage.attachmentsCount != 0) { isValid = true }
if (!isValid) { throw Error.InvalidMessage }
if (!isValid) {
throw Error.InvalidMessage
}
// If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp
// will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround
// for this issue.

View File

@@ -3,7 +3,7 @@ package org.session.libsession.messaging.sending_receiving
import android.text.TextUtils
import org.session.libsession.avatars.AvatarHelper
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.AttachmentDownloadJob
import org.session.libsession.messaging.jobs.BackgroundGroupAddJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.CallMessage
@@ -61,7 +61,11 @@ fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content,
is ConfigurationMessage -> handleConfigurationMessage(message)
is UnsendRequest -> handleUnsendRequest(message)
is MessageRequestResponse -> handleMessageRequestResponse(message)
is VisibleMessage -> handleVisibleMessage(message, proto, openGroupID)
is VisibleMessage -> handleVisibleMessage(message, proto, openGroupID,
runIncrement = true,
runThreadUpdate = true,
runProfileUpdate = true
)
is CallMessage -> handleCallMessage(message)
}
}
@@ -138,10 +142,13 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
TextSecurePreferences.setConfigurationMessageSynced(context, true)
TextSecurePreferences.setLastProfileUpdateTime(context, message.sentTimestamp!!)
if (firstTimeSync) {
val allClosedGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
for (closedGroup in message.closedGroups) {
if (allClosedGroupPublicKeys.contains(closedGroup.publicKey)) continue
val allClosedGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
for (closedGroup in message.closedGroups) {
if (allClosedGroupPublicKeys.contains(closedGroup.publicKey)) {
// just handle the closed group encryption key pairs to avoid sync'd devices getting out of sync
storage.addClosedGroupEncryptionKeyPair(closedGroup.encryptionKeyPair!!, closedGroup.publicKey)
} else if (firstTimeSync) {
// only handle new closed group if it's first time sync
handleNewClosedGroup(message.sender!!, message.sentTimestamp!!, closedGroup.publicKey, closedGroup.name,
closedGroup.encryptionKeyPair!!, closedGroup.members, closedGroup.admins, message.sentTimestamp!!, closedGroup.expirationTimer)
}
@@ -149,7 +156,11 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
val allV2OpenGroups = storage.getAllV2OpenGroups().map { it.value.joinURL }
for (openGroup in message.openGroups) {
if (allV2OpenGroups.contains(openGroup)) continue
storage.addOpenGroup(openGroup)
Log.d("OpenGroup", "All open groups doesn't contain $openGroup")
if (!storage.hasBackgroundGroupAddJob(openGroup)) {
Log.d("OpenGroup", "Doesn't contain background job for $openGroup, adding")
JobQueue.shared.add(BackgroundGroupAddJob(openGroup))
}
}
val profileManager = SSKEnvironment.shared.profileManager
val recipient = Recipient.from(context, Address.fromSerialized(userPublicKey), false)
@@ -192,8 +203,12 @@ fun handleMessageRequestResponse(message: MessageRequestResponse) {
}
//endregion
// region Visible Messages
fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalServiceProtos.Content, openGroupID: String?) {
fun MessageReceiver.handleVisibleMessage(message: VisibleMessage,
proto: SignalServiceProtos.Content,
openGroupID: String?,
runIncrement: Boolean,
runThreadUpdate: Boolean,
runProfileUpdate: Boolean): Long? {
val storage = MessagingModuleConfiguration.shared.storage
val context = MessagingModuleConfiguration.shared.context
val userPublicKey = storage.getUserPublicKey()
@@ -209,23 +224,25 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
// Update profile if needed
val recipient = Recipient.from(context, Address.fromSerialized(messageSender!!), false)
val profile = message.profile
if (profile != null && userPublicKey != messageSender) {
val profileManager = SSKEnvironment.shared.profileManager
val name = profile.displayName!!
if (name.isNotEmpty()) {
profileManager.setName(context, recipient, name)
}
val newProfileKey = profile.profileKey
if (runProfileUpdate) {
val profile = message.profile
if (profile != null && userPublicKey != messageSender) {
val profileManager = SSKEnvironment.shared.profileManager
val name = profile.displayName!!
if (name.isNotEmpty()) {
profileManager.setName(context, recipient, name)
}
val newProfileKey = profile.profileKey
val needsProfilePicture = !AvatarHelper.avatarFileExists(context, Address.fromSerialized(messageSender))
val profileKeyValid = newProfileKey?.isNotEmpty() == true && (newProfileKey.size == 16 || newProfileKey.size == 32) && profile.profilePictureURL?.isNotEmpty() == true
val profileKeyChanged = (recipient.profileKey == null || !MessageDigest.isEqual(recipient.profileKey, newProfileKey))
val needsProfilePicture = !AvatarHelper.avatarFileExists(context, Address.fromSerialized(messageSender))
val profileKeyValid = newProfileKey?.isNotEmpty() == true && (newProfileKey.size == 16 || newProfileKey.size == 32) && profile.profilePictureURL?.isNotEmpty() == true
val profileKeyChanged = (recipient.profileKey == null || !MessageDigest.isEqual(recipient.profileKey, newProfileKey))
if ((profileKeyValid && profileKeyChanged) || (profileKeyValid && needsProfilePicture)) {
profileManager.setProfileKey(context, recipient, newProfileKey!!)
profileManager.setUnidentifiedAccessMode(context, recipient, Recipient.UnidentifiedAccessMode.UNKNOWN)
profileManager.setProfilePictureURL(context, recipient, profile.profilePictureURL!!)
if ((profileKeyValid && profileKeyChanged) || (profileKeyValid && needsProfilePicture)) {
profileManager.setProfileKey(context, recipient, newProfileKey!!)
profileManager.setUnidentifiedAccessMode(context, recipient, Recipient.UnidentifiedAccessMode.UNKNOWN)
profileManager.setProfilePictureURL(context, recipient, profile.profilePictureURL!!)
}
}
}
// Parse quote if needed
@@ -259,8 +276,8 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
}
// Parse attachments if needed
val attachments = proto.dataMessage.attachmentsList.mapNotNull { proto ->
val attachment = Attachment.fromProto(proto)
val attachments = proto.dataMessage.attachmentsList.mapNotNull { attachmentProto ->
val attachment = Attachment.fromProto(attachmentProto)
if (!attachment.isValid()) {
return@mapNotNull null
} else {
@@ -269,15 +286,11 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
// Persist the message
message.threadID = threadID
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.DuplicateMessage
// Parse & persist attachments
// Start attachment downloads if needed
storage.getAttachmentsForMessage(messageID).iterator().forEach { attachment ->
attachment.attachmentId?.let { id ->
val downloadJob = AttachmentDownloadJob(id.rowId, messageID)
JobQueue.shared.add(downloadJob)
}
}
val messageID = storage.persist(
message, quoteModel, linkPreviews,
message.groupPublicKey, openGroupID,
attachments, runIncrement, runThreadUpdate
) ?: return null
val openGroupServerID = message.openGroupServerMessageID
if (openGroupServerID != null) {
val isSms = !(message.isMediaMessage() || attachments.isNotEmpty())
@@ -285,8 +298,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
// Cancel any typing indicators if needed
cancelTypingIndicatorsIfNeeded(message.sender!!)
// Notify the user if needed
SSKEnvironment.shared.notificationManager.updateNotification(context, threadID)
return messageID
}
//endregion

View File

@@ -33,7 +33,7 @@ public class DatabaseAttachment extends Attachment {
@Nullable
public Uri getDataUri() {
if (hasData) {
return MessagingModuleConfiguration.shared.getStorage().getAttachmentDataUri(attachmentId);
return MessagingModuleConfiguration.getShared().getStorage().getAttachmentDataUri(attachmentId);
} else {
return null;
}
@@ -43,7 +43,7 @@ public class DatabaseAttachment extends Attachment {
@Nullable
public Uri getThumbnailUri() {
if (hasThumbnail) {
return MessagingModuleConfiguration.shared.getStorage().getAttachmentThumbnailUri(attachmentId);
return MessagingModuleConfiguration.getShared().getStorage().getAttachmentThumbnailUri(attachmentId);
} else {
return null;
}

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import kotlin.math.min
class ClosedGroupPollerV2 {
private val executorService = Executors.newScheduledThreadPool(4)
private val executorService = Executors.newScheduledThreadPool(1)
private var isPolling = mutableMapOf<String, Boolean>()
private var futures = mutableMapOf<String, ScheduledFuture<*>>()

View File

@@ -3,7 +3,13 @@ package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.*
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.GroupAvatarDownloadJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.jobs.OpenGroupDeleteJob
import org.session.libsession.messaging.jobs.TrimThreadJob
import org.session.libsession.messaging.open_groups.OpenGroupAPIV2
import org.session.libsession.messaging.open_groups.OpenGroupMessageV2
import org.session.libsession.utilities.Address
@@ -80,24 +86,32 @@ class OpenGroupPollerV2(private val server: String, private val executorService:
JobQueue.shared.add(BatchMessageReceiveJob(parameters, openGroupID))
}
if (envelopes.isNotEmpty()) {
JobQueue.shared.add(TrimThreadJob(threadId,openGroupID))
}
val indicatedMax = messages.mapNotNull { it.serverID }.maxOrNull() ?: 0
val currentLastMessageServerID = storage.getLastMessageServerID(room, server) ?: 0
val actualMax = max(messages.mapNotNull { it.serverID }.maxOrNull() ?: 0, currentLastMessageServerID)
if (actualMax > 0) {
val actualMax = max(indicatedMax, currentLastMessageServerID)
if (actualMax > 0 && indicatedMax > currentLastMessageServerID) {
storage.setLastMessageServerID(room, server, actualMax)
}
}
private fun handleDeletedMessages(room: String, openGroupID: String, deletions: List<OpenGroupAPIV2.MessageDeletion>) {
val storage = MessagingModuleConfiguration.shared.storage
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())
val threadID = storage.getThreadId(Address.fromSerialized(groupID)) ?: return
val deletedMessageIDs = deletions.mapNotNull { deletion ->
dataProvider.getMessageID(deletion.deletedMessageServerID, threadID)
val serverIds = deletions.map { deletion ->
deletion.deletedMessageServerID
}
deletedMessageIDs.forEach { (messageId, isSms) ->
MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms)
if (serverIds.isNotEmpty()) {
val deleteJob = OpenGroupDeleteJob(serverIds.toLongArray(), threadID, openGroupID)
JobQueue.shared.add(deleteJob)
}
val currentMax = storage.getLastDeletionServerID(room, server) ?: 0L
val latestMax = deletions.map { it.id }.maxOrNull() ?: 0L
if (latestMax > currentMax && latestMax != 0L) {

View File

@@ -1,7 +1,11 @@
package org.session.libsession.messaging.sending_receiving.pollers
import nl.komponents.kovenant.*
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.resolve
import nl.komponents.kovenant.task
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
@@ -11,7 +15,8 @@ import org.session.libsession.snode.SnodeModule
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Snode
import java.security.SecureRandom
import java.util.*
import java.util.Timer
import java.util.TimerTask
private class PromiseCanceledException : Exception("Promise canceled.")
@@ -23,7 +28,8 @@ class Poller {
// region Settings
companion object {
private val retryInterval: Long = 1 * 1000
private const val retryInterval: Long = 2 * 1000
private const val maxInterval: Long = 15 * 1000
}
// endregion
@@ -32,7 +38,7 @@ class Poller {
if (hasStarted) { return }
Log.d("Loki", "Started polling.")
hasStarted = true
setUpPolling()
setUpPolling(retryInterval)
}
fun stopIfNeeded() {
@@ -43,7 +49,7 @@ class Poller {
// endregion
// region Private API
private fun setUpPolling() {
private fun setUpPolling(delay: Long) {
if (!hasStarted) { return; }
val thread = Thread.currentThread()
SnodeAPI.getSwarm(userPublicKey).bind {
@@ -51,13 +57,20 @@ class Poller {
val deferred = deferred<Unit, Exception>()
pollNextSnode(deferred)
deferred.promise
}.always {
}.success {
val nextDelay = if (isCaughtUp) retryInterval else 0
Timer().schedule(object : TimerTask() {
override fun run() {
thread.run { setUpPolling() }
thread.run { setUpPolling(retryInterval) }
}
}, retryInterval)
}, nextDelay)
}.fail {
val nextDelay = minOf(maxInterval, (delay * 1.2).toLong())
Timer().schedule(object : TimerTask() {
override fun run() {
thread.run { setUpPolling(nextDelay) }
}
}, nextDelay)
}
}

View File

@@ -140,13 +140,7 @@ object OnionRequestAPI {
testSnode(candidate).success {
deferred.resolve(candidate)
}.fail {
getGuardSnode().success {
deferred.resolve(candidate)
}.fail { exception ->
if (exception is InsufficientSnodesException) {
deferred.reject(exception)
}
}
deferred.reject(it)
}
return deferred.promise
}

View File

@@ -41,7 +41,7 @@ import kotlin.properties.Delegates.observable
object SnodeAPI {
private val sodium by lazy { LazySodiumAndroid(SodiumAndroid()) }
private val database: LokiAPIDatabaseProtocol
internal val database: LokiAPIDatabaseProtocol
get() = SnodeModule.shared.storage
private val broadcaster: Broadcaster
get() = SnodeModule.shared.broadcaster
@@ -292,7 +292,6 @@ object SnodeAPI {
}
fun getRawMessages(snode: Snode, publicKey: String, requiresAuth: Boolean = true, namespace: Int = 0): RawResponsePromise {
val userED25519KeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(Error.NoKeyPair)
// Get last message hash
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val parameters = mutableMapOf<String,Any>(
@@ -301,6 +300,12 @@ object SnodeAPI {
)
// Construct signature
if (requiresAuth) {
val userED25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(Error.NoKeyPair)
} catch (e: Exception) {
Log.e("Loki", "Error getting KeyPair", e)
return Promise.ofFail(Error.NoKeyPair)
}
val timestamp = Date().time + SnodeAPI.clockOffset
val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
@@ -477,7 +482,7 @@ object SnodeAPI {
return if (messages != null) {
updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
val newRawMessages = removeDuplicates(publicKey, messages, namespace)
return parseEnvelopes(newRawMessages);
return parseEnvelopes(newRawMessages)
} else {
listOf()
}
@@ -494,7 +499,8 @@ object SnodeAPI {
}
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int): List<*> {
val receivedMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
val receivedMessageHashValues = originalMessageHashValues.toMutableSet()
val result = rawMessages.filter { rawMessage ->
val rawMessageAsJSON = rawMessage as? Map<*, *>
val hashValue = rawMessageAsJSON?.get("hash") as? String
@@ -507,7 +513,9 @@ object SnodeAPI {
false
}
}
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
if (originalMessageHashValues != receivedMessageHashValues) {
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
}
return result
}

View File

@@ -14,6 +14,7 @@ import kotlinx.coroutines.flow.asSharedFlow
import org.session.libsession.BuildConfig
import org.session.libsession.R
import org.session.libsession.utilities.TextSecurePreferences.Companion.CALL_NOTIFICATIONS_ENABLED
import org.session.libsession.utilities.TextSecurePreferences.Companion.LAST_VACUUM_TIME
import org.session.libsession.utilities.TextSecurePreferences.Companion.SHOWN_CALL_NOTIFICATION
import org.session.libsession.utilities.TextSecurePreferences.Companion.SHOWN_CALL_WARNING
import org.session.libsignal.utilities.Log
@@ -160,6 +161,8 @@ interface TextSecurePreferences {
fun setShownCallWarning(): Boolean
fun setShownCallNotification(): Boolean
fun isCallNotificationsEnabled(): Boolean
fun getLastVacuum(): Long
fun setLastVacuumNow()
fun clearAll()
companion object {
@@ -240,6 +243,7 @@ interface TextSecurePreferences {
const val CALL_NOTIFICATIONS_ENABLED = "pref_call_notifications_enabled"
const val SHOWN_CALL_WARNING = "pref_shown_call_warning" // call warning is user-facing warning of enabling calls
const val SHOWN_CALL_NOTIFICATION = "pref_shown_call_notification" // call notification is a promp to check privacy settings
const val LAST_VACUUM_TIME = "pref_last_vacuum_time"
@JvmStatic
fun getLastConfigurationSyncTime(context: Context): Long {
@@ -909,6 +913,16 @@ interface TextSecurePreferences {
return previousValue != setValue
}
@JvmStatic
fun getLastVacuumTime(context: Context): Long {
return getLongPreference(context, LAST_VACUUM_TIME, 0)
}
@JvmStatic
fun setLastVacuumNow(context: Context) {
setLongPreference(context, LAST_VACUUM_TIME, System.currentTimeMillis())
}
@JvmStatic
fun clearAll(context: Context) {
getDefaultSharedPreferences(context).edit().clear().commit()
@@ -1469,6 +1483,14 @@ class AppTextSecurePreferences @Inject constructor(
return getBooleanPreference(CALL_NOTIFICATIONS_ENABLED, false)
}
override fun getLastVacuum(): Long {
return getLongPreference(LAST_VACUUM_TIME, 0)
}
override fun setLastVacuumNow() {
setLongPreference(LAST_VACUUM_TIME, System.currentTimeMillis())
}
override fun setShownCallNotification(): Boolean {
val previousValue = getBooleanPreference(SHOWN_CALL_NOTIFICATION, false)
if (previousValue) return false

View File

@@ -0,0 +1,31 @@
package org.session.libsession.utilities
import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
/**
* Not really a 'debouncer' but named to be similar to the current Debouncer
* designed to queue tasks on a window (if not already queued) like a timer
*/
class WindowDebouncer(private val window: Long, private val timer: Timer) {
private val atomicRef: AtomicReference<Runnable?> = AtomicReference(null)
private val hasStarted = AtomicBoolean(false)
private val recursiveRunnable: TimerTask = object:TimerTask() {
override fun run() {
val runnable = atomicRef.getAndSet(null)
runnable?.run()
}
}
fun publish(runnable: Runnable) {
if (hasStarted.compareAndSet(false, true)) {
timer.scheduleAtFixedRate(recursiveRunnable, 0, window)
}
atomicRef.compareAndSet(null, runnable)
}
}

View File

@@ -302,7 +302,7 @@ public class Recipient implements RecipientModifiedListener {
}
public synchronized @Nullable String getName() {
StorageProtocol storage = MessagingModuleConfiguration.shared.getStorage();
StorageProtocol storage = MessagingModuleConfiguration.getShared().getStorage();
String sessionID = this.address.toString();
if (isGroupRecipient()) {
if (this.name == null) {

View File

@@ -109,7 +109,7 @@ class RecipientProvider {
private @NonNull RecipientDetails getIndividualRecipientDetails(Context context, @NonNull Address address, Optional<RecipientSettings> settings) {
if (!settings.isPresent()) {
settings = Optional.fromNullable(MessagingModuleConfiguration.shared.getStorage().getRecipientSettings(address));
settings = Optional.fromNullable(MessagingModuleConfiguration.getShared().getStorage().getRecipientSettings(address));
}
boolean systemContact = settings.isPresent() && !TextUtils.isEmpty(settings.get().getSystemDisplayName());
@@ -120,12 +120,12 @@ class RecipientProvider {
private @NonNull RecipientDetails getGroupRecipientDetails(Context context, Address groupId, Optional<GroupRecord> groupRecord, Optional<RecipientSettings> settings, boolean asynchronous) {
if (!groupRecord.isPresent()) {
groupRecord = Optional.fromNullable(MessagingModuleConfiguration.shared.getStorage().getGroup(groupId.toGroupString()));
groupRecord = Optional.fromNullable(MessagingModuleConfiguration.getShared().getStorage().getGroup(groupId.toGroupString()));
}
if (!settings.isPresent()) {
settings = Optional.fromNullable(MessagingModuleConfiguration.shared.getStorage().getRecipientSettings(groupId));
settings = Optional.fromNullable(MessagingModuleConfiguration.getShared().getStorage().getRecipientSettings(groupId));
}
if (groupRecord.isPresent()) {