Merge branch 'dev' of https://github.com/oxen-io/session-android into client-side-nickname

This commit is contained in:
Ryan ZHAO
2021-05-13 10:25:52 +10:00
16 changed files with 109 additions and 93 deletions

View File

@@ -45,7 +45,7 @@ interface StorageProtocol {
// Jobs
fun persistJob(job: Job)
fun markJobAsSucceeded(jobId: String)
fun markJobAsFailed(jobId: String)
fun markJobAsFailedPermanently(jobId: String)
fun getAllPendingJobs(type: String): Map<String,Job?>
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob?
fun getMessageSendJob(messageSendJobID: String): MessageSendJob?

View File

@@ -103,7 +103,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val messageSendJob = storage.getMessageSendJob(messageSendJobID)
MessageSender.handleFailedMessageSend(this.message, e)
if (messageSendJob != null) {
storage.markJobAsFailed(messageSendJobID)
storage.markJobAsFailedPermanently(messageSendJobID)
}
}
@@ -131,6 +131,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
override fun create(data: Data): AttachmentUploadJob {
val serializedMessage = data.getByteArray(MESSAGE_KEY)
val kryo = Kryo()
kryo.isRegistrationRequired = false
val input = Input(serializedMessage)
val message: Message = kryo.readObject(input, Message::class.java)
input.close()

View File

@@ -12,6 +12,7 @@ interface Job {
// Keys used for database storage
private val ID_KEY = "id"
private val FAILURE_COUNT_KEY = "failure_count"
internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes
}
fun execute()

View File

@@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsignal.utilities.logging.Log
import java.lang.IllegalStateException
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
@@ -17,29 +18,49 @@ import kotlin.math.roundToLong
class JobQueue : JobDelegate {
private var hasResumedPendingJobs = false // Just for debugging
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
private val scope = GlobalScope + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED)
val timer = Timer()
private fun CoroutineScope.processWithDispatcher(channel: Channel<Job>, dispatcher: CoroutineDispatcher) = launch(dispatcher) {
for (job in channel) {
if (!isActive) break
job.delegate = this@JobQueue
job.execute()
}
}
init {
// Process jobs
scope.launch(dispatcher) {
scope.launch {
val rxQueue = Channel<Job>(capacity = 1024)
val txQueue = Channel<Job>(capacity = 1024)
val attachmentQueue = Channel<Job>(capacity = 1024)
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
val txJob = processWithDispatcher(txQueue, txDispatcher)
val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher)
while (isActive) {
queue.receive().let { job ->
if (job.canExecuteParallel()) {
launch(multiDispatcher) {
job.delegate = this@JobQueue
job.execute()
}
} else {
job.delegate = this@JobQueue
job.execute()
for (job in queue) {
when (job) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> txQueue.send(job)
is AttachmentDownloadJob -> attachmentQueue.send(job)
is MessageReceiveJob -> rxQueue.send(job)
else -> throw IllegalStateException("Unexpected job type.")
}
}
}
// The job has been cancelled
receiveJob.cancel()
txJob.cancel()
attachmentJob.cancel()
}
}
@@ -49,13 +70,6 @@ class JobQueue : JobDelegate {
val shared: JobQueue by lazy { JobQueue() }
}
private fun Job.canExecuteParallel(): Boolean {
return this.javaClass in arrayOf(
AttachmentUploadJob::class.java,
AttachmentDownloadJob::class.java
)
}
fun add(job: Job) {
addWithoutExecuting(job)
queue.offer(job) // offer always called on unlimited capacity
@@ -109,10 +123,24 @@ class JobQueue : JobDelegate {
}
override fun handleJobFailed(job: Job, error: Exception) {
job.failureCount += 1
// Canceled
val storage = MessagingModuleConfiguration.shared.storage
if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")}
if (job.failureCount == job.maxFailureCount) {
if (storage.isJobCanceled(job)) {
return Log.i("Loki", "${job::class.simpleName} canceled.")
}
// Message send jobs waiting for the attachment to upload
if (job is MessageSendJob && error is MessageSendJob.AwaitingAttachmentUploadException) {
val retryInterval: Long = 1000 * 4
Log.i("Loki", "Message send job waiting for attachment upload to finish.")
timer.schedule(delay = retryInterval) {
Log.i("Loki", "Retrying ${job::class.simpleName}.")
queue.offer(job)
}
return
}
// Regular job failure
job.failureCount += 1
if (job.failureCount >= job.maxFailureCount) {
handleJobFailedPermanently(job, error)
} else {
storage.persistJob(job)
@@ -132,7 +160,7 @@ class JobQueue : JobDelegate {
private fun handleJobFailedPermanently(jobId: String) {
val storage = MessagingModuleConfiguration.shared.storage
storage.markJobAsFailed(jobId)
storage.markJobAsFailedPermanently(jobId)
}
private fun getRetryInterval(job: Job): Long {

View File

@@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.VisibleMessage
@@ -11,6 +12,9 @@ import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsignal.utilities.logging.Log
class MessageSendJob(val message: Message, val destination: Destination) : Job {
object AwaitingAttachmentUploadException : Exception("Awaiting attachment upload.")
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
@@ -45,7 +49,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
JobQueue.shared.add(job)
}
}
if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing
if (attachmentsToUpload.isNotEmpty()) {
this.handleFailure(AwaitingAttachmentUploadException)
return
} // Wait for all attachments to upload before continuing
}
MessageSender.send(this.message, this.destination).success {
this.handleSuccess()
@@ -80,7 +87,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
override fun serialize(): Data {
val kryo = Kryo()
kryo.isRegistrationRequired = false
val output = Output(ByteArray(4096), 10_000_000)
val output = Output(ByteArray(4096), MAX_BUFFER_SIZE)
// Message
kryo.writeClassAndObject(output, message)
output.close()

View File

@@ -78,6 +78,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
override fun create(data: Data): NotifyPNServerJob {
val serializedMessage = data.getByteArray(MESSAGE_KEY)
val kryo = Kryo()
kryo.isRegistrationRequired = false
val input = Input(serializedMessage)
val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java)
input.close()

View File

@@ -1,7 +1,6 @@
package org.session.libsession.messaging.sending_receiving
import android.text.TextUtils
import okhttp3.HttpUrl
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.AttachmentDownloadJob
import org.session.libsession.messaging.jobs.JobQueue
@@ -156,7 +155,12 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
// Get or create thread
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
?: message.sender!!, message.groupPublicKey, openGroupID)
?: message.sender!!, message.groupPublicKey, openGroupID)
if (threadID < 0) {
// thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread
throw MessageReceiver.Error.NoThread
}
val openGroup = threadID.let {
storage.getOpenGroup(it.toString())
@@ -236,7 +240,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
}
val openGroupServerID = message.openGroupServerMessageID
if (openGroupServerID != null) {
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty()))
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !message.isMediaMessage())
}
// Cancel any typing indicators if needed
cancelTypingIndicatorsIfNeeded(message.sender!!)

View File

@@ -6,6 +6,7 @@ import java.security.SecureRandom
* Uses `SecureRandom` to pick an element from this collection.
*/
fun <T> Collection<T>.getRandomElementOrNull(): T? {
if (isEmpty()) return null
val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure
return elementAtOrNull(index)
}