mirror of
https://github.com/oxen-io/session-android.git
synced 2025-03-27 07:02:14 +00:00
fix: unnamed open groups being processed by creating new threads after deletion
job db not marking successful/unsuccessful properly handling send and receive better / in order
This commit is contained in:
parent
84fa3dfc86
commit
edc1454609
@ -581,7 +581,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
|
||||
val database = DatabaseFactory.getThreadDatabase(context)
|
||||
if (!openGroupID.isNullOrEmpty()) {
|
||||
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray())), false)
|
||||
return database.getOrCreateThreadIdFor(recipient)
|
||||
return database.getThreadIdIfExistsFor(recipient)
|
||||
} else if (!groupPublicKey.isNullOrEmpty()) {
|
||||
val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false)
|
||||
return database.getOrCreateThreadIdFor(recipient)
|
||||
|
@ -29,15 +29,15 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
|
||||
contentValues.put(jobType, job.getFactoryKey())
|
||||
contentValues.put(failureCount, job.failureCount)
|
||||
contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize()))
|
||||
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID))
|
||||
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(job.id!!))
|
||||
}
|
||||
|
||||
fun markJobAsSucceeded(jobID: String) {
|
||||
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID ))
|
||||
databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID ))
|
||||
}
|
||||
|
||||
fun markJobAsFailed(jobID: String) {
|
||||
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID ))
|
||||
databaseHelper.writableDatabase.delete(sessionJobTable, "${Companion.jobID} = ?", arrayOf( jobID ))
|
||||
}
|
||||
|
||||
fun getAllPendingJobs(type: String): Map<String, Job?> {
|
||||
@ -75,7 +75,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa
|
||||
var cursor: android.database.Cursor? = null
|
||||
try {
|
||||
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id ))
|
||||
return cursor != null && cursor.moveToFirst()
|
||||
return cursor == null || !cursor.moveToFirst()
|
||||
} catch (e: Exception) {
|
||||
// Do nothing
|
||||
} finally {
|
||||
|
@ -17,29 +17,50 @@ import kotlin.math.roundToLong
|
||||
class JobQueue : JobDelegate {
|
||||
private var hasResumedPendingJobs = false // Just for debugging
|
||||
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
|
||||
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val multiDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
||||
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
private val attachmentDispatcher = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
|
||||
private val scope = GlobalScope + SupervisorJob()
|
||||
private val queue = Channel<Job>(UNLIMITED)
|
||||
|
||||
val timer = Timer()
|
||||
|
||||
private fun CoroutineScope.processWithDispatcher(channel: Channel<Job>, dispatcher: CoroutineDispatcher) = launch(dispatcher) {
|
||||
for (job in channel) {
|
||||
if (!isActive) break
|
||||
job.delegate = this@JobQueue
|
||||
job.execute()
|
||||
}
|
||||
}
|
||||
|
||||
init {
|
||||
// Process jobs
|
||||
scope.launch(dispatcher) {
|
||||
scope.launch {
|
||||
val rxQueue = Channel<Job>(capacity = 1024)
|
||||
val txQueue = Channel<Job>(capacity = 1024)
|
||||
val attachmentQueue = Channel<Job>(capacity = 1024)
|
||||
|
||||
val receiveJob = processWithDispatcher(rxQueue, rxDispatcher)
|
||||
val txJob = processWithDispatcher(txQueue, txDispatcher)
|
||||
val attachmentJob = processWithDispatcher(attachmentQueue, attachmentDispatcher)
|
||||
|
||||
while (isActive) {
|
||||
queue.receive().let { job ->
|
||||
if (job.canExecuteParallel()) {
|
||||
launch(multiDispatcher) {
|
||||
job.delegate = this@JobQueue
|
||||
job.execute()
|
||||
}
|
||||
} else {
|
||||
job.delegate = this@JobQueue
|
||||
job.execute()
|
||||
for (job in queue) {
|
||||
when (job) {
|
||||
is NotifyPNServerJob,
|
||||
is AttachmentUploadJob,
|
||||
is MessageSendJob -> txQueue.send(job)
|
||||
is AttachmentDownloadJob -> attachmentQueue.send(job)
|
||||
else -> rxQueue.send(job)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// job has been cancelled
|
||||
receiveJob.cancel()
|
||||
txJob.cancel()
|
||||
attachmentJob.cancel()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,14 +70,6 @@ class JobQueue : JobDelegate {
|
||||
val shared: JobQueue by lazy { JobQueue() }
|
||||
}
|
||||
|
||||
private fun Job.canExecuteParallel(): Boolean {
|
||||
return this.javaClass in arrayOf(
|
||||
MessageSendJob::class.java,
|
||||
AttachmentUploadJob::class.java,
|
||||
AttachmentDownloadJob::class.java
|
||||
)
|
||||
}
|
||||
|
||||
fun add(job: Job) {
|
||||
addWithoutExecuting(job)
|
||||
queue.offer(job) // offer always called on unlimited capacity
|
||||
@ -112,8 +125,10 @@ class JobQueue : JobDelegate {
|
||||
override fun handleJobFailed(job: Job, error: Exception) {
|
||||
job.failureCount += 1
|
||||
val storage = MessagingModuleConfiguration.shared.storage
|
||||
if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")}
|
||||
if (job.failureCount == job.maxFailureCount) {
|
||||
if (storage.isJobCanceled(job)) {
|
||||
return Log.i("Loki", "${job::class.simpleName} canceled.")
|
||||
}
|
||||
if (job.failureCount >= job.maxFailureCount) {
|
||||
handleJobFailedPermanently(job, error)
|
||||
} else {
|
||||
storage.persistJob(job)
|
||||
|
@ -12,6 +12,9 @@ import org.session.libsession.messaging.sending_receiving.MessageSender
|
||||
import org.session.libsignal.utilities.logging.Log
|
||||
|
||||
class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
||||
|
||||
object AwaitingUploadException: Exception("Awaiting attachment upload")
|
||||
|
||||
override var delegate: JobDelegate? = null
|
||||
override var id: String? = null
|
||||
override var failureCount: Int = 0
|
||||
@ -46,7 +49,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
||||
JobQueue.shared.add(job)
|
||||
}
|
||||
}
|
||||
if (attachmentsToUpload.isNotEmpty()) return // Wait for all attachments to upload before continuing
|
||||
if (attachmentsToUpload.isNotEmpty()) {
|
||||
this.handleFailure(AwaitingUploadException)
|
||||
return
|
||||
} // Wait for all attachments to upload before continuing
|
||||
}
|
||||
MessageSender.send(this.message, this.destination).success {
|
||||
this.handleSuccess()
|
||||
|
@ -155,6 +155,11 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
||||
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget
|
||||
?: message.sender!!, message.groupPublicKey, openGroupID)
|
||||
|
||||
if (threadID < 0) {
|
||||
// thread doesn't exist, should only be reached in a case where we are processing open group messages for no longer existent thread
|
||||
throw MessageReceiver.Error.NoThread
|
||||
}
|
||||
|
||||
val openGroup = threadID.let {
|
||||
storage.getOpenGroup(it.toString())
|
||||
}
|
||||
@ -233,7 +238,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
|
||||
}
|
||||
val openGroupServerID = message.openGroupServerMessageID
|
||||
if (openGroupServerID != null) {
|
||||
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !(message.isMediaMessage() || attachments.isNotEmpty()))
|
||||
storage.setOpenGroupServerMessageID(messageID, openGroupServerID, threadID, !message.isMediaMessage())
|
||||
}
|
||||
// Cancel any typing indicators if needed
|
||||
cancelTypingIndicatorsIfNeeded(message.sender!!)
|
||||
|
Loading…
x
Reference in New Issue
Block a user