mirror of
https://github.com/oxen-io/session-android.git
synced 2024-11-30 13:35:18 +00:00
Merge pull request #532 from hjubb/background_job_improvements
Background job improvements
This commit is contained in:
commit
eb58872b93
@ -327,7 +327,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
|
|||||||
.setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(this)))
|
.setJobStorage(new FastJobStorage(DatabaseFactory.getJobDatabase(this)))
|
||||||
.setDependencyInjector(this)
|
.setDependencyInjector(this)
|
||||||
.build());
|
.build());
|
||||||
JobQueue.getShared().resumePendingJobs();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeDependencyInjection() {
|
private void initializeDependencyInjection() {
|
||||||
@ -455,7 +454,6 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
|
|||||||
poller.setUserPublicKey(userPublicKey);
|
poller.setUserPublicKey(userPublicKey);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this);
|
|
||||||
poller = new Poller();
|
poller = new Poller();
|
||||||
closedGroupPoller = new ClosedGroupPoller();
|
closedGroupPoller = new ClosedGroupPoller();
|
||||||
}
|
}
|
||||||
|
@ -581,7 +581,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
|
|||||||
val database = DatabaseFactory.getThreadDatabase(context)
|
val database = DatabaseFactory.getThreadDatabase(context)
|
||||||
if (!openGroupID.isNullOrEmpty()) {
|
if (!openGroupID.isNullOrEmpty()) {
|
||||||
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false)
|
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false)
|
||||||
return database.getOrCreateThreadIdFor(recipient)
|
return database.getThreadIdIfExistsFor(recipient)
|
||||||
} else if (!groupPublicKey.isNullOrEmpty()) {
|
} else if (!groupPublicKey.isNullOrEmpty()) {
|
||||||
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false)
|
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false)
|
||||||
return database.getOrCreateThreadIdFor(recipient)
|
return database.getOrCreateThreadIdFor(recipient)
|
||||||
|
@ -31,7 +31,7 @@ class PublicChatManager(private val context: Context) {
|
|||||||
refreshChatsAndPollers()
|
refreshChatsAndPollers()
|
||||||
for ((threadID, _) in chats) {
|
for ((threadID, _) in chats) {
|
||||||
val poller = pollers[threadID]
|
val poller = pollers[threadID]
|
||||||
areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else true
|
areAllCaughtUp = if (poller != null) areAllCaughtUp && poller.isCaughtUp else areAllCaughtUp
|
||||||
}
|
}
|
||||||
return areAllCaughtUp
|
return areAllCaughtUp
|
||||||
}
|
}
|
||||||
@ -42,6 +42,9 @@ class PublicChatManager(private val context: Context) {
|
|||||||
val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService)
|
val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService)
|
||||||
poller.isCaughtUp = false
|
poller.isCaughtUp = false
|
||||||
}
|
}
|
||||||
|
for ((_,poller) in v2Pollers) {
|
||||||
|
poller.isCaughtUp = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun startPollersIfNeeded() {
|
public fun startPollersIfNeeded() {
|
||||||
|
@ -23,7 +23,6 @@ class SessionProtocolImpl(private val context: Context) : SessionProtocol {
|
|||||||
override fun decrypt(ciphertext: ByteArray, x25519KeyPair: ECKeyPair): Pair<ByteArray, String> {
|
override fun decrypt(ciphertext: ByteArray, x25519KeyPair: ECKeyPair): Pair<ByteArray, String> {
|
||||||
val recipientX25519PrivateKey = x25519KeyPair.privateKey.serialize()
|
val recipientX25519PrivateKey = x25519KeyPair.privateKey.serialize()
|
||||||
val recipientX25519PublicKey = Hex.fromStringCondensed(x25519KeyPair.hexEncodedPublicKey.removing05PrefixIfNeeded())
|
val recipientX25519PublicKey = Hex.fromStringCondensed(x25519KeyPair.hexEncodedPublicKey.removing05PrefixIfNeeded())
|
||||||
Log.d("Test", "recipientX25519PublicKey: $recipientX25519PublicKey")
|
|
||||||
val signatureSize = Sign.BYTES
|
val signatureSize = Sign.BYTES
|
||||||
val ed25519PublicKeySize = Sign.PUBLICKEYBYTES
|
val ed25519PublicKeySize = Sign.PUBLICKEYBYTES
|
||||||
|
|
||||||
|
@ -29,15 +29,15 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
|
|||||||
contentValues.put(jobType, job.getFactoryKey())
|
contentValues.put(jobType, job.getFactoryKey())
|
||||||
contentValues.put(failureCount, job.failureCount)
|
contentValues.put(failureCount, job.failureCount)
|
||||||
contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize()))
|
contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize()))
|
||||||
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID))
|
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!))
|
||||||
}
|
}
|
||||||
|
|
||||||
fun markJobAsSucceeded(jobID: String) {
|
fun markJobAsSucceeded(jobID: String) {
|
||||||
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID ))
|
databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID ))
|
||||||
}
|
}
|
||||||
|
|
||||||
fun markJobAsFailed(jobID: String) {
|
fun markJobAsFailed(jobID: String) {
|
||||||
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID ))
|
databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID ))
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getAllPendingJobs(type: String): Map<String, Job?> {
|
fun getAllPendingJobs(type: String): Map<String, Job?> {
|
||||||
@ -75,7 +75,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
|
|||||||
var cursor: android.database.Cursor? = null
|
var cursor: android.database.Cursor? = null
|
||||||
try {
|
try {
|
||||||
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id ))
|
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id ))
|
||||||
return cursor != null && cursor.moveToFirst()
|
return cursor == null || !cursor.moveToFirst()
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -131,6 +131,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
|
|||||||
override fun create(data: Data): AttachmentUploadJob {
|
override fun create(data: Data): AttachmentUploadJob {
|
||||||
val serializedMessage = data.getByteArray(MESSAGE_KEY)
|
val serializedMessage = data.getByteArray(MESSAGE_KEY)
|
||||||
val kryo = Kryo()
|
val kryo = Kryo()
|
||||||
|
kryo.isRegistrationRequired = false
|
||||||
val input = Input(serializedMessage)
|
val input = Input(serializedMessage)
|
||||||
val message: Message = kryo.readObject(input, Message::class.java)
|
val message: Message = kryo.readObject(input, Message::class.java)
|
||||||
input.close()
|
input.close()
|
||||||
|
@ -12,6 +12,7 @@ interface Job {
|
|||||||
// Keys used for database storage
|
// Keys used for database storage
|
||||||
private val ID_KEY = "id"
|
private val ID_KEY = "id"
|
||||||
private val FAILURE_COUNT_KEY = "failure_count"
|
private val FAILURE_COUNT_KEY = "failure_count"
|
||||||
|
internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
fun execute()
|
fun execute()
|
||||||
|
@ -17,29 +17,50 @@ import kotlin.math.roundToLong
|
|||||||
class JobQueue : JobDelegate {
|
class JobQueue : JobDelegate {
|
||||||
private var hasResumedPendingJobs = false // Just for debugging
|
private var hasResumedPendingJobs = false // Just for debugging
|
||||||
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
||||||
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||||
private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||||
|
private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
||||||
private val scope = GlobalScope + SupervisorJob()
|
private val scope = GlobalScope + SupervisorJob()
|
||||||
private val queue = Channel<Job>(UNLIMITED)
|
private val queue = Channel<Job>(UNLIMITED)
|
||||||
|
|
||||||
val timer = Timer()
|
val timer = Timer()
|
||||||
|
|
||||||
|
private fun CoroutineScope.processWithDispatcher(channel: Channel<Job>, dispatcher: CoroutineDispatcher) = launch(dispatcher) {
|
||||||
|
for (job in channel) {
|
||||||
|
if (!isActive) break
|
||||||
|
job.delegate = this@JobQueue
|
||||||
|
job.execute()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
init {
|
init {
|
||||||
// Process jobs
|
// Process jobs
|
||||||
scope.launch(dispatcher) {
|
scope.launch {
|
||||||
|
val rxQueue = Channel<Job>(capacity = 1024)
|
||||||
|
val txQueue = Channel<Job>(capacity = 1024)
|
||||||
|
val attachmentQueue = Channel<Job>(capacity = 1024)
|
||||||
|
|
||||||
|
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
|
||||||
|
val txJob = processWithDispatcher(txQueue, txDispatcher)
|
||||||
|
val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher)
|
||||||
|
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
queue.receive().let { job ->
|
for (job in queue) {
|
||||||
if (job.canExecuteParallel()) {
|
when (job) {
|
||||||
launch(multiDispatcher) {
|
is NotifyPNServerJob,
|
||||||
job.delegate = this@JobQueue
|
is AttachmentUploadJob,
|
||||||
job.execute()
|
is MessageSendJob -> txQueue.send(job)
|
||||||
}
|
is AttachmentDownloadJob -> attachmentQueue.send(job)
|
||||||
} else {
|
else -> rxQueue.send(job)
|
||||||
job.delegate = this@JobQueue
|
|
||||||
job.execute()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// job has been cancelled
|
||||||
|
receiveJob.cancel()
|
||||||
|
txJob.cancel()
|
||||||
|
attachmentJob.cancel()
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,13 +70,6 @@ class JobQueue : JobDelegate {
|
|||||||
val shared: JobQueue by lazy { JobQueue() }
|
val shared: JobQueue by lazy { JobQueue() }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun Job.canExecuteParallel(): Boolean {
|
|
||||||
return this.javaClass in arrayOf(
|
|
||||||
AttachmentUploadJob::class.java,
|
|
||||||
AttachmentDownloadJob::class.java
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun add(job: Job) {
|
fun add(job: Job) {
|
||||||
addWithoutExecuting(job)
|
addWithoutExecuting(job)
|
||||||
queue.offer(job) // offer always called on unlimited capacity
|
queue.offer(job) // offer always called on unlimited capacity
|
||||||
@ -111,8 +125,10 @@ class JobQueue : JobDelegate {
|
|||||||
override fun handleJobFailed(job: Job, error: Exception) {
|
override fun handleJobFailed(job: Job, error: Exception) {
|
||||||
job.failureCount += 1
|
job.failureCount += 1
|
||||||
val storage = MessagingModuleConfiguration.shared.storage
|
val storage = MessagingModuleConfiguration.shared.storage
|
||||||
if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")}
|
if (storage.isJobCanceled(job)) {
|
||||||
if (job.failureCount == job.maxFailureCount) {
|
return Log.i("Loki", "${job::class.simpleName} canceled.")
|
||||||
|
}
|
||||||
|
if (job.failureCount >= job.maxFailureCount) {
|
||||||
handleJobFailedPermanently(job, error)
|
handleJobFailedPermanently(job, error)
|
||||||
} else {
|
} else {
|
||||||
storage.persistJob(job)
|
storage.persistJob(job)
|
||||||
|
@ -4,6 +4,7 @@ import com.esotericsoftware.kryo.Kryo
|
|||||||
import com.esotericsoftware.kryo.io.Input
|
import com.esotericsoftware.kryo.io.Input
|
||||||
import com.esotericsoftware.kryo.io.Output
|
import com.esotericsoftware.kryo.io.Output
|
||||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
|
import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE
|
||||||
import org.session.libsession.messaging.messages.Destination
|
import org.session.libsession.messaging.messages.Destination
|
||||||
import org.session.libsession.messaging.messages.Message
|
import org.session.libsession.messaging.messages.Message
|
||||||
import org.session.libsession.messaging.messages.visible.VisibleMessage
|
import org.session.libsession.messaging.messages.visible.VisibleMessage
|
||||||
@ -11,6 +12,9 @@ import org.session.libsession.messaging.sending_receiving.MessageSender
|
|||||||
import org.session.libsignal.utilities.logging.Log
|
import org.session.libsignal.utilities.logging.Log
|
||||||
|
|
||||||
class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
||||||
|
|
||||||
|
object AwaitingUploadException: Exception("Awaiting attachment upload")
|
||||||
|
|
||||||
override var delegate: JobDelegate? = null
|
override var delegate: JobDelegate? = null
|
||||||
override var id: String? = null
|
override var id: String? = null
|
||||||
override var failureCount: Int = 0
|
override var failureCount: Int = 0
|
||||||
@ -45,7 +49,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
|||||||
JobQueue.shared.add(job)
|
JobQueue.shared.add(job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing
|
if (attachmentsToUpload.isNotEmpty()) {
|
||||||
|
this.handleFailure(AwaitingUploadException)
|
||||||
|
return
|
||||||
|
} // Wait for all attachments to upload before continuing
|
||||||
}
|
}
|
||||||
MessageSender.send(this.message, this.destination).success {
|
MessageSender.send(this.message, this.destination).success {
|
||||||
this.handleSuccess()
|
this.handleSuccess()
|
||||||
@ -80,7 +87,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
|||||||
override fun serialize(): Data {
|
override fun serialize(): Data {
|
||||||
val kryo = Kryo()
|
val kryo = Kryo()
|
||||||
kryo.isRegistrationRequired = false
|
kryo.isRegistrationRequired = false
|
||||||
val output = Output(ByteArray(4096), 10_000_000)
|
val output = Output(ByteArray(4096), MAX_BUFFER_SIZE)
|
||||||
// Message
|
// Message
|
||||||
kryo.writeClassAndObject(output, message)
|
kryo.writeClassAndObject(output, message)
|
||||||
output.close()
|
output.close()
|
||||||
|
@ -78,6 +78,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
|
|||||||
override fun create(data: Data): NotifyPNServerJob {
|
override fun create(data: Data): NotifyPNServerJob {
|
||||||
val serializedMessage = data.getByteArray(MESSAGE_KEY)
|
val serializedMessage = data.getByteArray(MESSAGE_KEY)
|
||||||
val kryo = Kryo()
|
val kryo = Kryo()
|
||||||
|
kryo.isRegistrationRequired = false
|
||||||
val input = Input(serializedMessage)
|
val input = Input(serializedMessage)
|
||||||
val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java)
|
val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java)
|
||||||
input.close()
|
input.close()
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package org.session.libsession.messaging.sending_receiving
|
package org.session.libsession.messaging.sending_receiving
|
||||||
|
|
||||||
import android.text.TextUtils
|
import android.text.TextUtils
|
||||||
import okhttp3.HttpUrl
|
|
||||||
import org.session.libsession.messaging.MessagingModuleConfiguration
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
||||||
import org.session.libsession.messaging.jobs.AttachmentDownloadJob
|
import org.session.libsession.messaging.jobs.AttachmentDownloadJob
|
||||||
import org.session.libsession.messaging.jobs.JobQueue
|
import org.session.libsession.messaging.jobs.JobQueue
|
||||||
@ -156,6 +155,11 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
|||||||
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
|
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
|
||||||
?: message.sender!!, message.groupPublicKey, openGroupID)
|
?: message.sender!!, message.groupPublicKey, openGroupID)
|
||||||
|
|
||||||
|
if (threadID < 0) {
|
||||||
|
// thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread
|
||||||
|
throw MessageReceiver.Error.NoThread
|
||||||
|
}
|
||||||
|
|
||||||
val openGroup = threadID.let {
|
val openGroup = threadID.let {
|
||||||
storage.getOpenGroup(it.toString())
|
storage.getOpenGroup(it.toString())
|
||||||
}
|
}
|
||||||
@ -234,7 +238,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
|||||||
}
|
}
|
||||||
val openGroupServerID = message.openGroupServerMessageID
|
val openGroupServerID = message.openGroupServerMessageID
|
||||||
if (openGroupServerID != null) {
|
if (openGroupServerID != null) {
|
||||||
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty()))
|
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !message.isMediaMessage())
|
||||||
}
|
}
|
||||||
// Cancel any typing indicators if needed
|
// Cancel any typing indicators if needed
|
||||||
cancelTypingIndicatorsIfNeeded(message.sender!!)
|
cancelTypingIndicatorsIfNeeded(message.sender!!)
|
||||||
|
@ -6,6 +6,7 @@ import java.security.SecureRandom
|
|||||||
* Uses `SecureRandom` to pick an element from this collection.
|
* Uses `SecureRandom` to pick an element from this collection.
|
||||||
*/
|
*/
|
||||||
fun <T> Collection<T>.getRandomElementOrNull(): T? {
|
fun <T> Collection<T>.getRandomElementOrNull(): T? {
|
||||||
|
if (isEmpty()) return null
|
||||||
val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure
|
val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure
|
||||||
return elementAtOrNull(index)
|
return elementAtOrNull(index)
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import java.security.SecureRandom
|
|||||||
* Uses `SecureRandom` to pick an element from this collection.
|
* Uses `SecureRandom` to pick an element from this collection.
|
||||||
*/
|
*/
|
||||||
fun <T> Collection<T>.getRandomElementOrNull(): T? {
|
fun <T> Collection<T>.getRandomElementOrNull(): T? {
|
||||||
|
if (isEmpty()) return null
|
||||||
val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure
|
val index = SecureRandom().nextInt(size) // SecureRandom() should be cryptographically secure
|
||||||
return elementAtOrNull(index)
|
return elementAtOrNull(index)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user