Fixed a couple of issues with the OpenGroupDeleteJob

Updated the OpenGroupDispatcher to have a thread limit of 8 (was previously unlimited which could result in the app getting flooded with threads under certain conditions)
Updated the OpenGroupDeleteJob to do bulk deletions (instead of individual message deletions)
Updated the OpenGroupDeleteJob to catch and report failures (wasn't previously happening)
This commit is contained in:
Morgan Pretty 2023-01-20 09:02:59 +11:00
parent cae15a200d
commit 810430e806
10 changed files with 169 additions and 8 deletions

View File

@ -176,6 +176,11 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
return messageDB.getMessageID(serverId, threadId) return messageDB.getMessageID(serverId, threadId)
} }
override fun getMessageIDs(serverIds: List<Long>, threadId: Long): Pair<List<Long>, List<Long>> {
val messageDB = DatabaseComponent.get(context).lokiMessageDatabase()
return messageDB.getMessageIDs(serverIds, threadId)
}
override fun deleteMessage(messageID: Long, isSms: Boolean) { override fun deleteMessage(messageID: Long, isSms: Boolean) {
val messagingDatabase: MessagingDatabase = if (isSms) DatabaseComponent.get(context).smsDatabase() val messagingDatabase: MessagingDatabase = if (isSms) DatabaseComponent.get(context).smsDatabase()
else DatabaseComponent.get(context).mmsDatabase() else DatabaseComponent.get(context).mmsDatabase()
@ -184,6 +189,15 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
DatabaseComponent.get(context).lokiMessageDatabase().deleteMessageServerHash(messageID) DatabaseComponent.get(context).lokiMessageDatabase().deleteMessageServerHash(messageID)
} }
override fun deleteMessages(messageIDs: List<Long>, threadId: Long, isSms: Boolean) {
val messagingDatabase: MessagingDatabase = if (isSms) DatabaseComponent.get(context).smsDatabase()
else DatabaseComponent.get(context).mmsDatabase()
messagingDatabase.deleteMessages(messageIDs.toLongArray(), threadId)
DatabaseComponent.get(context).lokiMessageDatabase().deleteMessages(messageIDs)
DatabaseComponent.get(context).lokiMessageDatabase().deleteMessageServerHashes(messageIDs)
}
override fun updateMessageAsDeleted(timestamp: Long, author: String) { override fun updateMessageAsDeleted(timestamp: Long, author: String) {
val database = DatabaseComponent.get(context).mmsSmsDatabase() val database = DatabaseComponent.get(context).mmsSmsDatabase()
val address = Address.fromSerialized(author) val address = Address.fromSerialized(author)

View File

@ -35,6 +35,7 @@ import com.bumptech.glide.Glide;
import net.zetetic.database.sqlcipher.SQLiteDatabase; import net.zetetic.database.sqlcipher.SQLiteDatabase;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.session.libsession.messaging.sending_receiving.attachments.Attachment; import org.session.libsession.messaging.sending_receiving.attachments.Attachment;
@ -318,6 +319,28 @@ public class AttachmentDatabase extends Database {
notifyAttachmentListeners(); notifyAttachmentListeners();
} }
@SuppressWarnings("ResultOfMethodCallIgnored")
void deleteAttachmentsForMessages(long[] mmsIds) {
SQLiteDatabase database = databaseHelper.getWritableDatabase();
Cursor cursor = null;
String mmsIdString = StringUtils.join(mmsIds, ',');
try {
cursor = database.query(TABLE_NAME, new String[] {DATA, THUMBNAIL, CONTENT_TYPE}, MMS_ID + " IN (?)",
new String[] {mmsIdString}, null, null, null);
while (cursor != null && cursor.moveToNext()) {
deleteAttachmentOnDisk(cursor.getString(0), cursor.getString(1), cursor.getString(2));
}
} finally {
if (cursor != null)
cursor.close();
}
database.delete(TABLE_NAME, MMS_ID + " IN (?)", new String[] {mmsIdString});
notifyAttachmentListeners();
}
public void deleteAttachment(@NonNull AttachmentId id) { public void deleteAttachment(@NonNull AttachmentId id) {
SQLiteDatabase database = databaseHelper.getWritableDatabase(); SQLiteDatabase database = databaseHelper.getWritableDatabase();

View File

@ -8,6 +8,7 @@ import androidx.annotation.NonNull;
import net.zetetic.database.sqlcipher.SQLiteDatabase; import net.zetetic.database.sqlcipher.SQLiteDatabase;
import org.apache.commons.lang3.StringUtils;
import org.session.libsession.utilities.Address; import org.session.libsession.utilities.Address;
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper; import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper;
@ -109,6 +110,11 @@ public class GroupReceiptDatabase extends Database {
db.delete(TABLE_NAME, MMS_ID + " = ?", new String[] {String.valueOf(mmsId)}); db.delete(TABLE_NAME, MMS_ID + " = ?", new String[] {String.valueOf(mmsId)});
} }
void deleteRowsForMessages(long[] mmsIds) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.delete(TABLE_NAME, MMS_ID + " IN (?)", new String[] {StringUtils.join(mmsIds, ',')});
}
void deleteAllRows() { void deleteAllRows() {
SQLiteDatabase db = databaseHelper.getWritableDatabase(); SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.delete(TABLE_NAME, null, null); db.delete(TABLE_NAME, null, null);

View File

@ -77,6 +77,25 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab
database.endTransaction() database.endTransaction()
} }
fun deleteMessages(messageIDs: List<Long>) {
val database = databaseHelper.writableDatabase
database.beginTransaction()
database.delete(
messageIDTable,
"${Companion.messageID} IN (${messageIDs.map { "?" }.joinToString(",")})",
messageIDs.map { "$it" }.toTypedArray()
)
database.delete(
messageThreadMappingTable,
"${Companion.messageID} IN (${messageIDs.map { "?" }.joinToString(",")})",
messageIDs.map { "$it" }.toTypedArray()
)
database.setTransactionSuccessful()
database.endTransaction()
}
/** /**
* @return pair of sms or mms table-specific ID and whether it is in SMS table * @return pair of sms or mms table-specific ID and whether it is in SMS table
*/ */
@ -96,6 +115,37 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab
} }
} }
fun getMessageIDs(serverIDs: List<Long>, threadID: Long): Pair<List<Long>, List<Long>> {
val database = databaseHelper.readableDatabase
// Retrieve the message ids
val messageIdCursor = database
.rawQuery(
"""
SELECT ${messageThreadMappingTable}.${messageID}, ${messageIDTable}.${messageType}
FROM ${messageThreadMappingTable}
JOIN ${messageIDTable} ON ${messageIDTable}.message_id = ${messageThreadMappingTable}.${messageID}
WHERE (
${messageThreadMappingTable}.${Companion.threadID} = $threadID AND
${messageThreadMappingTable}.${Companion.serverID} IN (${serverIDs.joinToString(",")})
)
"""
)
val smsMessageIds: MutableList<Long> = mutableListOf()
val mmsMessageIds: MutableList<Long> = mutableListOf()
while (messageIdCursor.moveToNext()) {
if (messageIdCursor.getInt(1) == SMS_TYPE) {
smsMessageIds.add(messageIdCursor.getLong(0))
}
else {
mmsMessageIds.add(messageIdCursor.getLong(0))
}
}
return Pair(smsMessageIds, mmsMessageIds)
}
override fun setServerID(messageID: Long, serverID: Long, isSms: Boolean) { override fun setServerID(messageID: Long, serverID: Long, isSms: Boolean) {
val database = databaseHelper.writableDatabase val database = databaseHelper.writableDatabase
val contentValues = ContentValues(3) val contentValues = ContentValues(3)
@ -183,6 +233,15 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab
database.delete(messageHashTable, "${Companion.messageID} = ?", arrayOf(messageID.toString())) database.delete(messageHashTable, "${Companion.messageID} = ?", arrayOf(messageID.toString()))
} }
fun deleteMessageServerHashes(messageIDs: List<Long>) {
val database = databaseHelper.writableDatabase
database.delete(
messageHashTable,
"${Companion.messageID} IN (${messageIDs.map { "?" }.joinToString(",")})",
messageIDs.map { "$it" }.toTypedArray()
)
}
fun migrateThreadId(legacyThreadId: Long, newThreadId: Long) { fun migrateThreadId(legacyThreadId: Long, newThreadId: Long) {
val database = databaseHelper.writableDatabase val database = databaseHelper.writableDatabase
val contentValues = ContentValues(1) val contentValues = ContentValues(1)

View File

@ -42,6 +42,7 @@ public abstract class MessagingDatabase extends Database implements MmsSmsColumn
public abstract void markAsDeleted(long messageId, boolean read); public abstract void markAsDeleted(long messageId, boolean read);
public abstract boolean deleteMessage(long messageId); public abstract boolean deleteMessage(long messageId);
public abstract boolean deleteMessages(long[] messageId, long threadId);
public abstract void updateThreadId(long fromId, long toId); public abstract void updateThreadId(long fromId, long toId);

View File

@ -995,6 +995,23 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa
return threadDeleted return threadDeleted
} }
override fun deleteMessages(messageIds: LongArray, threadId: Long): Boolean {
val attachmentDatabase = get(context).attachmentDatabase()
val groupReceiptDatabase = get(context).groupReceiptDatabase()
queue(Runnable { attachmentDatabase.deleteAttachmentsForMessages(messageIds) })
groupReceiptDatabase.deleteRowsForMessages(messageIds)
val database = databaseHelper.writableDatabase
database!!.delete(TABLE_NAME, ID_IN, arrayOf(messageIds.joinToString(",")))
val threadDeleted = get(context).threadDatabase().update(threadId, false)
notifyConversationListeners(threadId)
notifyStickerListeners()
notifyStickerPackListeners()
return threadDeleted
}
override fun updateThreadId(fromId: Long, toId: Long) { override fun updateThreadId(fromId: Long, toId: Long) {
val contentValues = ContentValues(1) val contentValues = ContentValues(1)
contentValues.put(THREAD_ID, toId) contentValues.put(THREAD_ID, toId)

View File

@ -31,6 +31,7 @@ import com.annimon.stream.Stream;
import net.zetetic.database.sqlcipher.SQLiteDatabase; import net.zetetic.database.sqlcipher.SQLiteDatabase;
import net.zetetic.database.sqlcipher.SQLiteStatement; import net.zetetic.database.sqlcipher.SQLiteStatement;
import org.apache.commons.lang3.StringUtils;
import org.session.libsession.messaging.calls.CallMessageType; import org.session.libsession.messaging.calls.CallMessageType;
import org.session.libsession.messaging.messages.signal.IncomingGroupMessage; import org.session.libsession.messaging.messages.signal.IncomingGroupMessage;
import org.session.libsession.messaging.messages.signal.IncomingTextMessage; import org.session.libsession.messaging.messages.signal.IncomingTextMessage;
@ -52,6 +53,7 @@ import org.thoughtcrime.securesms.dependencies.DatabaseComponent;
import java.io.IOException; import java.io.IOException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -596,6 +598,30 @@ public class SmsDatabase extends MessagingDatabase {
return threadDeleted; return threadDeleted;
} }
@Override
public boolean deleteMessages(long[] messageIds, long threadId) {
String[] argsArray = new String[messageIds.length];
String[] argValues = new String[messageIds.length];
Arrays.fill(argsArray, "?");
for (int i = 0; i < messageIds.length; i++) {
argValues[i] = (messageIds[i] + "");
}
String combinedMessageIdArgss = StringUtils.join(messageIds, ',');
String combinedMessageIds = StringUtils.join(messageIds, ',');
Log.i("MessageDatabase", "Deleting: " + combinedMessageIds);
SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.delete(
TABLE_NAME,
ID + " IN (" + StringUtils.join(argsArray, ',') + ")",
argValues
);
boolean threadDeleted = DatabaseComponent.get(context).threadDatabase().update(threadId, false);
notifyConversationListeners(threadId);
return threadDeleted;
}
@Override @Override
public void updateThreadId(long fromId, long toId) { public void updateThreadId(long fromId, long toId) {
ContentValues contentValues = new ContentValues(1); ContentValues contentValues = new ContentValues(1);

View File

@ -20,7 +20,9 @@ interface MessageDataProvider {
* @return pair of sms or mms table-specific ID and whether it is in SMS table * @return pair of sms or mms table-specific ID and whether it is in SMS table
*/ */
fun getMessageID(serverId: Long, threadId: Long): Pair<Long, Boolean>? fun getMessageID(serverId: Long, threadId: Long): Pair<Long, Boolean>?
fun getMessageIDs(serverIDs: List<Long>, threadID: Long): Pair<List<Long>, List<Long>>
fun deleteMessage(messageID: Long, isSms: Boolean) fun deleteMessage(messageID: Long, isSms: Boolean)
fun deleteMessages(messageIDs: List<Long>, threadId: Long, isSms: Boolean)
fun updateMessageAsDeleted(timestamp: Long, author: String) fun updateMessageAsDeleted(timestamp: Long, author: String)
fun getServerHashForMessage(messageID: Long): String? fun getServerHashForMessage(messageID: Long): String?
fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment? fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment?

View File

@ -26,7 +26,7 @@ class JobQueue : JobDelegate {
private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>() private val jobTimestampMap = ConcurrentHashMap<Long, AtomicInteger>()
private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val rxDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val rxMediaDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher() private val rxMediaDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
private val openGroupDispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher() private val openGroupDispatcher = Executors.newFixedThreadPool(8).asCoroutineDispatcher()//Executors.newCachedThreadPool().asCoroutineDispatcher()
private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() private val txDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
private val scope = CoroutineScope(Dispatchers.Default) + SupervisorJob() private val scope = CoroutineScope(Dispatchers.Default) + SupervisorJob()
private val queue = Channel<Job>(UNLIMITED) private val queue = Channel<Job>(UNLIMITED)

View File

@ -23,15 +23,28 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val numberToDelete = messageServerIds.size val numberToDelete = messageServerIds.size
Log.d(TAG, "Deleting $numberToDelete messages") Log.d(TAG, "Deleting $numberToDelete messages")
var numberDeleted = 0
messageServerIds.forEach { serverId -> // FIXME: This entire process should probably run in a transaction (with the attachment deletion happening only if it succeeded)
val (messageId, isSms) = dataProvider.getMessageID(serverId, threadId) ?: return@forEach try {
dataProvider.deleteMessage(messageId, isSms) val messageIds = dataProvider.getMessageIDs(messageServerIds.toList(), threadId)
numberDeleted++
// Delete the SMS messages
if (messageIds.first.isNotEmpty()) {
dataProvider.deleteMessages(messageIds.first, threadId, true)
} }
Log.d(TAG, "Deleted $numberDeleted messages successfully")
// Delete the MMS messages
if (messageIds.second.isNotEmpty()) {
dataProvider.deleteMessages(messageIds.second, threadId, false)
}
Log.d(TAG, "Deleted ${messageIds.first.size + messageIds.second.size} messages successfully")
delegate?.handleJobSucceeded(this) delegate?.handleJobSucceeded(this)
} }
catch (e: Exception) {
delegate?.handleJobFailed(this, e)
}
}
override fun serialize(): Data = Data.Builder() override fun serialize(): Data = Data.Builder()
.putLongArray(MESSAGE_IDS, messageServerIds) .putLongArray(MESSAGE_IDS, messageServerIds)