diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index 7b4f2c2aa6..a8d00d690c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -75,6 +75,7 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor val userPublicKey = TextSecurePreferences.getLocalNumber(context)!! val privateChatsPromise = SnodeAPI.getMessages(userPublicKey).map { envelopes -> envelopes.map { envelope -> + // FIXME: Using a job here seems like a bad idea... MessageReceiveJob(envelope.toByteArray(), false).executeAsync() } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index 04edd3715e..3647a9937c 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -18,7 +18,8 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa const val jobType = "job_type" const val failureCount = "failure_count" const val serializedData = "serialized_data" - @JvmStatic val createSessionJobTableCommand = "CREATE TABLE $sessionJobTable ($jobID INTEGER PRIMARY KEY, $jobType STRING, $failureCount INTEGER DEFAULT 0, $serializedData TEXT);" + @JvmStatic val createSessionJobTableCommand + = "CREATE TABLE $sessionJobTable ($jobID INTEGER PRIMARY KEY, $jobType STRING, $failureCount INTEGER DEFAULT 0, $serializedData TEXT);" } fun persistJob(job: Job) { @@ -31,40 +32,41 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) } - fun markJobAsSucceeded(jobId: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) + fun markJobAsSucceeded(jobID: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) } - fun markJobAsFailed(jobId: String) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) + fun markJobAsFailed(jobID: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf( jobID )) } fun getAllPendingJobs(type: String): Map { val database = databaseHelper.readableDatabase - return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor -> - val jobId = cursor.getString(jobID) + return database.getAll(sessionJobTable, "$jobType = ?", arrayOf( type )) { cursor -> + val jobID = cursor.getString(jobID) try { - jobId to jobFromCursor(cursor) + jobID to jobFromCursor(cursor) } catch (e: Exception) { - Log.e("Loki", "Error serializing Job of type $type",e) - jobId to null + Log.e("Loki", "Error deserializing job of type: $type.", e) + jobID to null } }.toMap() } fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { val database = databaseHelper.readableDatabase - var result = mutableListOf() - database.getAll(sessionJobTable, "$jobType = ?", arrayOf(AttachmentUploadJob.KEY)) { cursor -> - result.add(jobFromCursor(cursor) as AttachmentUploadJob) + val result = mutableListOf() + database.getAll(sessionJobTable, "$jobType = ?", arrayOf( AttachmentUploadJob.KEY )) { cursor -> + val job = jobFromCursor(cursor) as AttachmentUploadJob? + if (job != null) { result.add(job) } } return result.firstOrNull { job -> job.attachmentID == attachmentID } } fun getMessageSendJob(messageSendJobID: String): MessageSendJob? { val database = databaseHelper.readableDatabase - return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf(messageSendJobID, MessageSendJob.KEY)) { cursor -> - jobFromCursor(cursor) as MessageSendJob + return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf( messageSendJobID, MessageSendJob.KEY )) { cursor -> + jobFromCursor(cursor) as MessageSendJob? } } @@ -72,7 +74,7 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa val database = databaseHelper.readableDatabase var cursor: android.database.Cursor? = null try { - cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf(job.id)) + cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf( job.id )) return cursor != null && cursor.moveToFirst() } catch (e: Exception) { // Do nothing @@ -82,10 +84,10 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa return false } - private fun jobFromCursor(cursor: Cursor): Job { + private fun jobFromCursor(cursor: Cursor): Job? { val type = cursor.getString(jobType) val data = SessionJobHelper.dataSerializer.deserialize(cursor.getString(serializedData)) - val job = SessionJobHelper.sessionJobInstantiator.instantiate(type, data) + val job = SessionJobHelper.sessionJobInstantiator.instantiate(type, data) ?: return null job.id = cursor.getString(jobID) job.failureCount = cursor.getInt(failureCount) return job diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt index 6855b08b02..8285813206 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentDownloadJob.kt @@ -3,7 +3,6 @@ package org.session.libsession.messaging.jobs import okhttp3.HttpUrl import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.file_server.FileServerAPI -import org.session.libsession.messaging.file_server.FileServerAPIV2 import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 import org.session.libsession.messaging.sending_receiving.attachments.AttachmentState import org.session.libsession.messaging.utilities.DotNetAPI @@ -31,8 +30,8 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) val KEY: String = "AttachmentDownloadJob" // Keys used for database storage - private val KEY_ATTACHMENT_ID = "attachment_id" - private val KEY_TS_INCOMING_MESSAGE_ID = "tsIncoming_message_id" + private val ATTACHMENT_ID_KEY = "attachment_id" + private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id" } override fun execute() { @@ -52,18 +51,19 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) try { val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val attachment = messageDataProvider.getDatabaseAttachment(attachmentID) - ?: return handleFailure(Error.NoAttachment) + ?: return handleFailure(Error.NoAttachment) messageDataProvider.setAttachmentState(AttachmentState.STARTED, attachmentID, this.databaseMessageID) val tempFile = createTempFile() - val threadId = MessagingModuleConfiguration.shared.storage.getThreadIdForMms(databaseMessageID) val openGroupV2 = MessagingModuleConfiguration.shared.storage.getV2OpenGroup(threadId.toString()) - val stream = if (openGroupV2 == null) { DownloadUtilities.downloadFile(tempFile, attachment.url, FileServerAPI.maxFileSize, null) // Assume we're retrieving an attachment for an open group server if the digest is not set - if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) FileInputStream(tempFile) - else AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest) + if (attachment.digest?.size ?: 0 == 0 || attachment.key.isNullOrEmpty()) { + FileInputStream(tempFile) + } else { + AttachmentCipherInputStream.createForAttachment(tempFile, attachment.size, Base64.decode(attachment.key), attachment.digest) + } } else { val url = HttpUrl.parse(attachment.url)!! val fileId = url.pathSegments().last() @@ -100,8 +100,9 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) } override fun serialize(): Data { - return Data.Builder().putLong(KEY_ATTACHMENT_ID, attachmentID) - .putLong(KEY_TS_INCOMING_MESSAGE_ID, databaseMessageID) + return Data.Builder() + .putLong(ATTACHMENT_ID_KEY, attachmentID) + .putLong(TS_INCOMING_MESSAGE_ID_KEY, databaseMessageID) .build(); } @@ -110,8 +111,9 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long) } class Factory : Job.Factory { + override fun create(data: Data): AttachmentDownloadJob { - return AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID)) + return AttachmentDownloadJob(data.getLong(ATTACHMENT_ID_KEY), data.getLong(TS_INCOMING_MESSAGE_ID_KEY)) } } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index 1d8b1a7170..e0849d1daf 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -30,44 +30,39 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess // Settings override val maxFailureCount: Int = 20 + companion object { val TAG = AttachmentUploadJob::class.simpleName val KEY: String = "AttachmentUploadJob" // Keys used for database storage - private val KEY_ATTACHMENT_ID = "attachment_id" - private val KEY_THREAD_ID = "thread_id" - private val KEY_MESSAGE = "message" - private val KEY_MESSAGE_SEND_JOB_ID = "message_send_job_id" + private val ATTACHMENT_ID_KEY = "attachment_id" + private val THREAD_ID_KEY = "thread_id" + private val MESSAGE_KEY = "message" + private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id" } override fun execute() { try { val attachment = MessagingModuleConfiguration.shared.messageDataProvider.getScaledSignalAttachmentStream(attachmentID) ?: return handleFailure(Error.NoAttachment) - val usePadding = false val openGroupV2 = MessagingModuleConfiguration.shared.storage.getV2OpenGroup(threadID) val openGroup = MessagingModuleConfiguration.shared.storage.getOpenGroup(threadID) - val server = openGroup?.let { - it.server - } ?: openGroupV2?.let { - it.server - } ?: FileServerAPI.shared.server + val server = openGroupV2?.server ?: openGroup?.server ?: FileServerAPI.shared.server val shouldEncrypt = (openGroup == null && openGroupV2 == null) // Encrypt if this isn't an open group - val attachmentKey = Util.getSecretBytes(64) val paddedLength = if (usePadding) PaddingInputStream.getPaddedSize(attachment.length) else attachment.length val dataStream = if (usePadding) PaddingInputStream(attachment.inputStream, attachment.length) else attachment.inputStream val ciphertextLength = if (shouldEncrypt) AttachmentCipherOutputStream.getCiphertextLength(paddedLength) else attachment.length - val outputStreamFactory = if (shouldEncrypt) AttachmentCipherOutputStreamFactory(attachmentKey) else PlaintextOutputStreamFactory() val attachmentData = PushAttachmentData(attachment.contentType, dataStream, ciphertextLength, outputStreamFactory, attachment.listener) - - val uploadResult = if (openGroupV2 == null) FileServerAPI.shared.uploadAttachment(server, attachmentData) else { + val uploadResult = if (openGroupV2 != null) { val dataBytes = attachmentData.data.readBytes() val result = OpenGroupAPIV2.upload(dataBytes, openGroupV2.room, openGroupV2.server).get() DotNetAPI.UploadResult(result, "${openGroupV2.server}/files/$result", byteArrayOf()) + } else { + FileServerAPI.shared.uploadAttachment(server, attachmentData) } handleSuccess(attachment, attachmentKey, uploadResult) } catch (e: java.lang.Exception) { @@ -82,7 +77,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess } private fun handleSuccess(attachment: SignalServiceAttachmentStream, attachmentKey: ByteArray, uploadResult: DotNetAPI.UploadResult) { - Log.w(TAG, "Attachment uploaded successfully.") + Log.d(TAG, "Attachment uploaded successfully.") delegate?.handleJobSucceeded(this) MessagingModuleConfiguration.shared.messageDataProvider.updateAttachmentAfterUploadSucceeded(attachmentID, attachment, attachmentKey, uploadResult) MessagingModuleConfiguration.shared.storage.resumeMessageSendJobIfNeeded(messageSendJobID) @@ -119,10 +114,11 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val output = Output(serializedMessage) kryo.writeObject(output, message) output.close() - return Data.Builder().putLong(KEY_ATTACHMENT_ID, attachmentID) - .putString(KEY_THREAD_ID, threadID) - .putByteArray(KEY_MESSAGE, serializedMessage) - .putString(KEY_MESSAGE_SEND_JOB_ID, messageSendJobID) + return Data.Builder() + .putLong(ATTACHMENT_ID_KEY, attachmentID) + .putString(THREAD_ID_KEY, threadID) + .putByteArray(MESSAGE_KEY, serializedMessage) + .putString(MESSAGE_SEND_JOB_ID_KEY, messageSendJobID) .build(); } @@ -133,12 +129,17 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess class Factory: Job.Factory { override fun create(data: Data): AttachmentUploadJob { - val serializedMessage = data.getByteArray(KEY_MESSAGE) + val serializedMessage = data.getByteArray(MESSAGE_KEY) val kryo = Kryo() val input = Input(serializedMessage) val message: Message = kryo.readObject(input, Message::class.java) input.close() - return AttachmentUploadJob(data.getLong(KEY_ATTACHMENT_ID), data.getString(KEY_THREAD_ID)!!, message, data.getString(KEY_MESSAGE_SEND_JOB_ID)!!) + return AttachmentUploadJob( + data.getLong(ATTACHMENT_ID_KEY), + data.getString(THREAD_ID_KEY)!!, + message, + data.getString(MESSAGE_SEND_JOB_ID_KEY)!! + ) } } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt index 4693fddf4a..b680734bb8 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/Job.kt @@ -8,21 +8,20 @@ interface Job { val maxFailureCount: Int companion object { + // Keys used for database storage - private val KEY_ID = "id" - private val KEY_FAILURE_COUNT = "failure_count" + private val ID_KEY = "id" + private val FAILURE_COUNT_KEY = "failure_count" } fun execute() fun serialize(): Data - /** - * Returns the key that can be used to find the relevant factory needed to create your job. - */ fun getFactoryKey(): String interface Factory { + fun create(data: Data): T } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index cffb2db7d6..a89338cf3c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -44,14 +44,15 @@ class JobQueue : JobDelegate { } companion object { + @JvmStatic val shared: JobQueue by lazy { JobQueue() } } private fun Job.canExecuteParallel(): Boolean { return this.javaClass in arrayOf( - AttachmentUploadJob::class.java, - AttachmentDownloadJob::class.java + AttachmentUploadJob::class.java, + AttachmentDownloadJob::class.java ) } @@ -68,7 +69,6 @@ class JobQueue : JobDelegate { val currentTime = System.currentTimeMillis() jobTimestampMap.putIfAbsent(currentTime, AtomicInteger()) job.id = currentTime.toString() + jobTimestampMap[currentTime]!!.getAndIncrement().toString() - MessagingModuleConfiguration.shared.storage.persistJob(job) } @@ -78,25 +78,26 @@ class JobQueue : JobDelegate { return } hasResumedPendingJobs = true - val allJobTypes = listOf(AttachmentUploadJob.KEY, - AttachmentDownloadJob.KEY, - MessageReceiveJob.KEY, - MessageSendJob.KEY, - NotifyPNServerJob.KEY + val allJobTypes = listOf( + AttachmentUploadJob.KEY, + AttachmentDownloadJob.KEY, + MessageReceiveJob.KEY, + MessageSendJob.KEY, + NotifyPNServerJob.KEY ) allJobTypes.forEach { type -> val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type) val pendingJobs = mutableListOf() for ((id, job) in allPendingJobs) { if (job == null) { - // job failed to serialize, remove it from the DB + // Job failed to deserialize, remove it from the DB handleJobFailedPermanently(id) } else { pendingJobs.add(job) } } pendingJobs.sortedBy { it.id }.forEach { job -> - Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") + Log.i("Loki", "Resuming pending job of type: ${job::class.simpleName}.") queue.offer(job) // Offer always called on unlimited capacity } } @@ -110,15 +111,15 @@ class JobQueue : JobDelegate { override fun handleJobFailed(job: Job, error: Exception) { job.failureCount += 1 val storage = MessagingModuleConfiguration.shared.storage - if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} + if (storage.isJobCanceled(job)) { return Log.i("Loki", "${job::class.simpleName} canceled.")} if (job.failureCount == job.maxFailureCount) { handleJobFailedPermanently(job, error) } else { storage.persistJob(job) val retryInterval = getRetryInterval(job) - Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") + Log.i("Loki", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") timer.schedule(delay = retryInterval) { - Log.i("Jobs", "Retrying ${job::class.simpleName}.") + Log.i("Loki", "Retrying ${job::class.simpleName}.") queue.offer(job) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt index 7c527bebbf..ebca416250 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageReceiveJob.kt @@ -11,7 +11,6 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val override var id: String? = null override var failureCount: Int = 0 - // Settings override val maxFailureCount: Int = 10 companion object { val TAG = MessageReceiveJob::class.simpleName @@ -20,10 +19,11 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val private val RECEIVE_LOCK = Object() // Keys used for database storage - private val KEY_DATA = "data" - private val KEY_IS_BACKGROUND_POLL = "is_background_poll" - private val KEY_OPEN_GROUP_MESSAGE_SERVER_ID = "openGroupMessageServerID" - private val KEY_OPEN_GROUP_ID = "open_group_id" + private val DATA_KEY = "data" + // FIXME: We probably shouldn't be using this job when background polling + private val IS_BACKGROUND_POLL_KEY = "is_background_poll" + private val OPEN_GROUP_MESSAGE_SERVER_ID_KEY = "openGroupMessageServerID" + private val OPEN_GROUP_ID_KEY = "open_group_id" } override fun execute() { @@ -35,19 +35,18 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val try { val isRetry: Boolean = failureCount != 0 val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, isRetry) - synchronized(RECEIVE_LOCK) { + synchronized(RECEIVE_LOCK) { // FIXME: Do we need this? MessageReceiver.handle(message, proto, this.openGroupID) } this.handleSuccess() deferred.resolve(Unit) } catch (e: Exception) { - Log.e(TAG, "Couldn't receive message due to error", e) - val error = e as? MessageReceiver.Error - if (error != null && !error.isRetryable) { - Log.e("Loki", "Message receive job permanently failed due to error", e) - this.handlePermanentFailure(error) + Log.e(TAG, "Couldn't receive message.", e) + if (e is MessageReceiver.Error && !e.isRetryable) { + Log.e("Loki", "Message receive job permanently failed.", e) + this.handlePermanentFailure(e) } else { - Log.e("Loki", "Couldn't receive message due to error", e) + Log.e("Loki", "Couldn't receive message.", e) this.handleFailure(e) } deferred.resolve(Unit) // The promise is just used to keep track of when we're done @@ -68,10 +67,10 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val } override fun serialize(): Data { - val builder = Data.Builder().putByteArray(KEY_DATA, data) - .putBoolean(KEY_IS_BACKGROUND_POLL, isBackgroundPoll) - openGroupMessageServerID?.let { builder.putLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID, openGroupMessageServerID) } - openGroupID?.let { builder.putString(KEY_OPEN_GROUP_ID, openGroupID) } + val builder = Data.Builder().putByteArray(DATA_KEY, data) + .putBoolean(IS_BACKGROUND_POLL_KEY, isBackgroundPoll) + openGroupMessageServerID?.let { builder.putLong(OPEN_GROUP_MESSAGE_SERVER_ID_KEY, it) } + openGroupID?.let { builder.putString(OPEN_GROUP_ID_KEY, it) } return builder.build(); } @@ -82,7 +81,12 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val class Factory: Job.Factory { override fun create(data: Data): MessageReceiveJob { - return MessageReceiveJob(data.getByteArray(KEY_DATA), data.getBoolean(KEY_IS_BACKGROUND_POLL), data.getLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID), data.getString(KEY_OPEN_GROUP_ID)) + return MessageReceiveJob( + data.getByteArray(DATA_KEY), + data.getBoolean(IS_BACKGROUND_POLL_KEY), + data.getLong(OPEN_GROUP_MESSAGE_SERVER_ID_KEY), + data.getString(OPEN_GROUP_ID_KEY) + ) } } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index 83822c4fc7..48266764a7 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -15,22 +15,22 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override var id: String? = null override var failureCount: Int = 0 - // Settings override val maxFailureCount: Int = 10 + companion object { val TAG = MessageSendJob::class.simpleName val KEY: String = "MessageSendJob" // Keys used for database storage - private val KEY_MESSAGE = "message" - private val KEY_DESTINATION = "destination" + private val MESSAGE_KEY = "message" + private val DESTINATION_KEY = "destination" } override fun execute() { val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider val message = message as? VisibleMessage - message?.let { - if(!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted + if (message != null) { + if (!messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted val attachmentIDs = mutableListOf() attachmentIDs.addAll(message.attachmentIDs) message.quote?.let { it.attachmentID?.let { attachmentID -> attachmentIDs.add(attachmentID) } } @@ -51,9 +51,8 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { this.handleSuccess() }.fail { exception -> Log.e(TAG, "Couldn't send message due to error: $exception.") - val e = exception as? MessageSender.Error - e?.let { - if (!e.isRetryable) this.handlePermanentFailure(e) + if (exception is MessageSender.Error) { + if (!exception.isRetryable) { this.handlePermanentFailure(exception) } } this.handleFailure(exception) } @@ -70,8 +69,10 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { private fun handleFailure(error: Exception) { Log.w(TAG, "Failed to send $message::class.simpleName.") val message = message as? VisibleMessage - message?.let { - if(!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) return // The message has been deleted + if (message != null) { + if (!MessagingModuleConfiguration.shared.messageDataProvider.isOutgoingMessage(message.sentTimestamp!!)) { + return // The message has been deleted + } } delegate?.handleJobFailed(this, error) } @@ -80,34 +81,42 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { val kryo = Kryo() kryo.isRegistrationRequired = false val output = Output(ByteArray(4096), 10_000_000) + // Message kryo.writeClassAndObject(output, message) output.close() val serializedMessage = output.toBytes() output.clear() + // Destination kryo.writeClassAndObject(output, destination) output.close() val serializedDestination = output.toBytes() - return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage) - .putByteArray(KEY_DESTINATION, serializedDestination) - .build(); + output.clear() + // Serialize + return Data.Builder() + .putByteArray(MESSAGE_KEY, serializedMessage) + .putByteArray(DESTINATION_KEY, serializedDestination) + .build() } override fun getFactoryKey(): String { return KEY } - class Factory: Job.Factory { + class Factory : Job.Factory { override fun create(data: Data): MessageSendJob { - val serializedMessage = data.getByteArray(KEY_MESSAGE) - val serializedDestination = data.getByteArray(KEY_DESTINATION) + val serializedMessage = data.getByteArray(MESSAGE_KEY) + val serializedDestination = data.getByteArray(DESTINATION_KEY) val kryo = Kryo() - var input = Input(serializedMessage) - val message = kryo.readClassAndObject(input) as Message - input.close() - input = Input(serializedDestination) - val destination = kryo.readClassAndObject(input) as Destination - input.close() + // Message + val messageInput = Input(serializedMessage) + val message = kryo.readClassAndObject(messageInput) as Message + messageInput.close() + // Destination + val destinationInput = Input(serializedDestination) + val destination = kryo.readClassAndObject(destinationInput) as Destination + destinationInput.close() + // Return return MessageSendJob(message, destination) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt index fb99f54f56..d5a1674bad 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/NotifyPNServerJob.kt @@ -21,16 +21,14 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { override var id: String? = null override var failureCount: Int = 0 - // Settings override val maxFailureCount: Int = 20 companion object { val KEY: String = "NotifyPNServerJob" // Keys used for database storage - private val KEY_MESSAGE = "message" + private val MESSAGE_KEY = "message" } - // Running override fun execute() { val server = PushNotificationAPI.server val parameters = mapOf( "data" to message.data, "send_to" to message.recipient ) @@ -41,10 +39,10 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { OnionRequestAPI.sendOnionRequest(request.build(), server, PushNotificationAPI.serverPublicKey, "/loki/v2/lsrpc").map { json -> val code = json["code"] as? Int if (code == null || code == 0) { - Log.d("Loki", "[Loki] Couldn't notify PN server due to error: ${json["message"] as? String ?: "null"}.") + Log.d("Loki", "Couldn't notify PN server due to error: ${json["message"] as? String ?: "null"}.") } }.fail { exception -> - Log.d("Loki", "[Loki] Couldn't notify PN server due to error: $exception.") + Log.d("Loki", "Couldn't notify PN server due to error: $exception.") } }.success { handleSuccess() @@ -68,17 +66,17 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job { val output = Output(serializedMessage) kryo.writeObject(output, message) output.close() - return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage).build(); + return Data.Builder().putByteArray(MESSAGE_KEY, serializedMessage).build(); } override fun getFactoryKey(): String { return KEY } - class Factory: Job.Factory { + class Factory : Job.Factory { override fun create(data: Data): NotifyPNServerJob { - val serializedMessage = data.getByteArray(KEY_MESSAGE) + val serializedMessage = data.getByteArray(MESSAGE_KEY) val kryo = Kryo() val input = Input(serializedMessage) val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobInstantiator.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobInstantiator.kt index bf0a1b2f8a..a6336e9148 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobInstantiator.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobInstantiator.kt @@ -2,11 +2,11 @@ package org.session.libsession.messaging.jobs class SessionJobInstantiator(private val jobFactories: Map>) { - fun instantiate(jobFactoryKey: String, data: Data): Job { + fun instantiate(jobFactoryKey: String, data: Data): Job? { if (jobFactories.containsKey(jobFactoryKey)) { - return jobFactories[jobFactoryKey]?.create(data) ?: throw IllegalStateException("Tried to instantiate a job with key '$jobFactoryKey', but no matching factory was found.") + return jobFactories[jobFactoryKey]?.create(data) } else { - throw IllegalStateException("Tried to instantiate a job with key '$jobFactoryKey', but no matching factory was found.") + return null } } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt index e7c02361e1..c681a67f3d 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/SessionJobManagerFactories.kt @@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs class SessionJobManagerFactories { companion object { + fun getSessionJobFactories(): Map> { return mapOf( AttachmentDownloadJob.KEY to AttachmentDownloadJob.Factory(),