fix: added more migration code for deleting unnecessary threads and groups, fixed a post-migration last seen issue on last item (current read is now), comment out actual network sync while testing migrations

This commit is contained in:
0x330a 2023-05-19 20:01:50 +10:00
parent 7ed12ce87d
commit 371fb20b6e
8 changed files with 81 additions and 58 deletions

View File

@ -994,6 +994,10 @@ class ConversationActivityV2 : PassphraseRequiredActionBarActivity(), InputBarDe
showOrHideScrollToBottomButton()
val firstVisiblePosition = layoutManager?.findFirstVisibleItemPosition() ?: RecyclerView.NO_POSITION
if (!firstLoad.get() && firstVisiblePosition != RecyclerView.NO_POSITION) {
if (firstVisiblePosition == 0) {
// last item, set it to now?
bufferedLastSeenChannel.trySend(SnodeAPI.nowWithOffset)
}
val visibleItemTimestamp = adapter.getTimestampForItemAt(firstVisiblePosition)
if (visibleItemTimestamp != null) {
bufferedLastSeenChannel.trySend(visibleItemTimestamp)

View File

@ -36,9 +36,9 @@ public class GroupDatabase extends Database implements LokiOpenGroupDatabaseProt
@SuppressWarnings("unused")
private static final String TAG = GroupDatabase.class.getSimpleName();
static final String TABLE_NAME = "groups";
public static final String TABLE_NAME = "groups";
private static final String ID = "_id";
static final String GROUP_ID = "group_id";
public static final String GROUP_ID = "group_id";
private static final String TITLE = "title";
private static final String MEMBERS = "members";
private static final String ZOMBIE_MEMBERS = "zombie_members";

View File

@ -99,6 +99,8 @@ open class Storage(context: Context, helper: SQLCipherOpenHelper, private val co
if (!getRecipientApproved(address)) return // don't store unapproved / message requests
if (getUserPublicKey() == address.serialize()) {
Log.d("Loki-DBG", "NTS created, context:\n${Thread.currentThread().stackTrace.joinToString("\n")}")
} else {
Log.d("Loki-DBG", "Thread created ${address.serialize()}")
}
val volatile = configFactory.convoVolatile ?: return

View File

@ -563,6 +563,7 @@ public class ThreadDatabase extends Database {
db.execSQL(reflectUpdates, new Object[]{threadId});
db.setTransactionSuccessful();
db.endTransaction();
notifyConversationListeners(threadId);
notifyConversationListListeners();
return true;
}

View File

@ -41,6 +41,7 @@ import org.thoughtcrime.securesms.database.SessionJobDatabase;
import org.thoughtcrime.securesms.database.SmsDatabase;
import org.thoughtcrime.securesms.database.ThreadDatabase;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.util.ConfigurationMessageUtilities;
import java.io.File;
@ -593,7 +594,8 @@ public class SQLCipherOpenHelper extends SQLiteOpenHelper {
if (oldVersion < lokiV41) {
db.execSQL(ConfigDatabase.CREATE_CONFIG_TABLE_COMMAND);
// TODO: delete threads where necessary for one to ones
db.execSQL(ConfigurationMessageUtilities.DELETE_INACTIVE_GROUPS);
db.execSQL(ConfigurationMessageUtilities.DELETE_INACTIVE_ONE_TO_ONES);
}
db.setTransactionSuccessful();

View File

@ -24,6 +24,9 @@ import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.WindowDebouncer
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.toHexString
import org.thoughtcrime.securesms.database.GroupDatabase
import org.thoughtcrime.securesms.database.GroupMemberDatabase
import org.thoughtcrime.securesms.database.ThreadDatabase
import org.thoughtcrime.securesms.dependencies.DatabaseComponent
import java.util.Timer
@ -261,4 +264,15 @@ object ConfigurationMessageUtilities {
return dump
}
@JvmField
val DELETE_INACTIVE_GROUPS: String = """
DELETE FROM ${GroupDatabase.TABLE_NAME} WHERE ${GroupDatabase.GROUP_ID} IN (SELECT ${ThreadDatabase.ADDRESS} FROM ${ThreadDatabase.TABLE_NAME} WHERE ${ThreadDatabase.MESSAGE_COUNT} <= 0 AND ${ThreadDatabase.ADDRESS} LIKE '${GroupUtil.CLOSED_GROUP_PREFIX}%');
DELETE FROM ${ThreadDatabase.TABLE_NAME} WHERE ${ThreadDatabase.ADDRESS} IN (SELECT ${ThreadDatabase.ADDRESS} FROM ${ThreadDatabase.TABLE_NAME} WHERE ${ThreadDatabase.MESSAGE_COUNT} <= 0 AND ${ThreadDatabase.ADDRESS} LIKE '${GroupUtil.CLOSED_GROUP_PREFIX}%');
""".trimIndent()
@JvmField
val DELETE_INACTIVE_ONE_TO_ONES: String = """
DELETE FROM ${ThreadDatabase.TABLE_NAME} WHERE ${ThreadDatabase.MESSAGE_COUNT} <= 0 AND ${ThreadDatabase.ADDRESS} NOT LIKE '${GroupUtil.CLOSED_GROUP_PREFIX}%' AND ${ThreadDatabase.ADDRESS} NOT LIKE '${GroupUtil.OPEN_GROUP_PREFIX}%' AND ${ThreadDatabase.ADDRESS} NOT LIKE '${GroupUtil.OPEN_GROUP_INBOX_PREFIX}%';
""".trimIndent()
}

View File

@ -91,60 +91,60 @@ data class ConfigurationSyncJob(val destination: Destination): Job {
}
// TODO: re-add all this to do actual network sync job
val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
try {
val rawResponses = batchResponse.get()
@Suppress("UNCHECKED_CAST")
val responseList = (rawResponses["results"] as List<RawResponse>)
// we are always adding in deletions at the end
val deletionResponse = if (toDeleteRequest != null) responseList.last() else null
val deletedHashes = deletionResponse?.let {
@Suppress("UNCHECKED_CAST")
// get the sub-request body
(deletionResponse["body"] as? RawResponse)?.let { body ->
// get the swarm dict
body["swarm"] as? RawResponse
}?.mapValues { (_, swarmDict) ->
// get the deleted values from dict
((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
}?.values?.reduce { acc, strings ->
// create an intersection of all deleted hashes (common between all swarm nodes)
acc intersect strings
}
} ?: emptySet()
// at this point responseList index should line up with configsRequiringPush index
configsRequiringPush.forEachIndexed { index, config ->
val (toPushMessage, _) = batchObjects[index]!!
val response = responseList[index]
val responseBody = response["body"] as? RawResponse
val insertHash = responseBody?.get("hash") as? String ?: run {
Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
return@forEachIndexed
}
Log.d(TAG, "Hash $insertHash returned from store request for new config")
// confirm pushed seqno
val thisSeqNo = toPushMessage.seqNo
config.confirmPushed(thisSeqNo, insertHash)
Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
// dump and write config after successful
if (config.needsDump()) { // usually this will be true?
configFactory.persist(config)
}
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request", e)
return delegate.handleJobFailed(this, dispatcherName, e)
}
// val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
// SnodeAPI.getRawBatchResponse(
// snode,
// destination.destinationPublicKey(),
// allRequests,
// sequence = true
// )
// }
//
// try {
// val rawResponses = batchResponse.get()
// @Suppress("UNCHECKED_CAST")
// val responseList = (rawResponses["results"] as List<RawResponse>)
// // we are always adding in deletions at the end
// val deletionResponse = if (toDeleteRequest != null) responseList.last() else null
// val deletedHashes = deletionResponse?.let {
// @Suppress("UNCHECKED_CAST")
// // get the sub-request body
// (deletionResponse["body"] as? RawResponse)?.let { body ->
// // get the swarm dict
// body["swarm"] as? RawResponse
// }?.mapValues { (_, swarmDict) ->
// // get the deleted values from dict
// ((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
// }?.values?.reduce { acc, strings ->
// // create an intersection of all deleted hashes (common between all swarm nodes)
// acc intersect strings
// }
// } ?: emptySet()
//
// // at this point responseList index should line up with configsRequiringPush index
// configsRequiringPush.forEachIndexed { index, config ->
// val (toPushMessage, _) = batchObjects[index]!!
// val response = responseList[index]
// val responseBody = response["body"] as? RawResponse
// val insertHash = responseBody?.get("hash") as? String ?: run {
// Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
// return@forEachIndexed
// }
// Log.d(TAG, "Hash $insertHash returned from store request for new config")
//
// // confirm pushed seqno
// val thisSeqNo = toPushMessage.seqNo
// config.confirmPushed(thisSeqNo, insertHash)
// Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
// // dump and write config after successful
// if (config.needsDump()) { // usually this will be true?
// configFactory.persist(config)
// }
// }
// } catch (e: Exception) {
// Log.e(TAG, "Error performing batch request", e)
// return delegate.handleJobFailed(this, dispatcherName, e)
// }
delegate.handleJobSucceeded(this, dispatcherName)
if (shouldRunAgain.get() && storage.getConfigSyncJob(destination) == null) {
// reschedule if something has updated since we started this job

View File

@ -124,7 +124,7 @@ class ConfigurationMessage(var closedGroups: List<ClosedGroup>, var openGroups:
val profileKey = ProfileKeyUtil.getProfileKey(context)
val groups = storage.getAllGroups(includeInactive = false)
for (group in groups) {
if (group.isClosedGroup) {
if (group.isClosedGroup && group.isActive) {
if (!group.members.contains(Address.fromSerialized(storage.getUserPublicKey()!!))) continue
val groupPublicKey = GroupUtil.doubleDecodeGroupID(group.encodedId).toHexString()
val encryptionKeyPair = storage.getLatestClosedGroupEncryptionKeyPair(groupPublicKey) ?: continue