Integrate shared libsession-util library (#1096)

* feat: add some config db basics and DI for it, make the user profile optional, start looking at integrate building from initial dump

* update: get latest util library submodule update

* refactor: fix compile for refactored API

* refactor: naming consistent with library

* feat: add in config storage and injection to common places, managing lifecycle of native instances

* refactor: config database changes, new protos, adding in support for config base namespace queries

* refactor: config query and store use the same format as other platforms

* feat: add batch snode calls and try to poll from all the config namespaces

* fix: add optional namespace in signature and params

* feat: add raw requests for modifying expiry and getting expiries

* feat: add some base config migration logic, start implementing wrappers for conversation and expiry types

* chore: update libsession base

* feat: start integrating conversation wrapper functions

* feat: add basic conversation info volatile types and implementations, start working on tests

* feat: more common library wrapper implementation and test

* fix: tests and compile issues

* fix: fix tests, don't use iterables

* feat: add all iterators and tests

* feat: add in more config factory for volatile

* feat: update request responses and their appropriate processing

* feat: add storage with hashes and some basic profile update logic in config factory probably move that somewhere else

* feat: adding config sync functionality, refactoring jobs to execute in suspend context to do some nice coroutine execution

* refactor: moving some properties around so we have access in libsession

* feat: expand on the config sync job, finish basic implementation to test against

* feat: add forced config sync

* feat: syncs the user profile stuff for now, and errors back to placeholder instead of unknown recipient

* feat: add basic message read logic for synchronizing last reads, need to modify the query to use the last seen instead of the unread count in a subquery possibly for thread display record

* feat: add broken unreads everywhere

* fix: unreads work now for incoming messages, need to sync conv volatile properly still

* feat: batching poll responses properly and handling groups properly

* fix: replace the mark read receiver (from notifications) to use the new set last seen mark read logic

* feat: update to the group list branch

* fix: compile errors from updating library to use latest branch, now requires cmake 3.22.1

* fix: fix the contact tests

* fix: getters weren't getters properly in the config factory, fixed new onboarding from configs

* feat: add the last seen

* feat: start adding user groups wrapper objects

* refactor: add more else branches for unimplemented types

* feat: buffer the last read when in conversation

* feat: add basic contact logic for setting local contact state. Need to implement handling properly

* refactor: trying to just include blocked status for now in updating contacts

* fix: add some more contact syncing: nicknames, approved statuses, blocked statuses

* feat: start implementing hashes in shared lib and refactoring

* feat: start to implement group list info classes and wrappers and refactor to use library based hashes

* feat: incorporate hashes from library, more wrapper for user groups and serialization from c++

* feat: adding more serialization changes for community base info and user groups LGC

* feat: adding more serialize deserialize to legacy closed groups

* feat: finish serial/deserial helper

* feat: just implement deserialize community info

* refactor: refactor tests and wrappers to use less pointers, finish implementing user groups API

* feat: finish latest wrappers fix tests and continue building default generation functions. refactor defaults to be used if no stored data blob in DB

* feat: more usergroup functionality, storage functionality for checking pinned status, adding pinned status for NTS/contacts, move community info parse full url to base community, add StorageProtocol logic for group info

* feat: adding user groups to the list of user configs, refactorign some of the config factory to fetch the user configs easier. Add handling for polling user group namespace

* feat: implement the default user config list

* feat: add user group config handling

* chore: extra missed existing group

* refactor: use existing lookup for objects in wrappers so they don't overwrite missing values

* feat: add contacts expiry serialization/deserialization, more LGC, timestamps to add closed group encryption info (for latest tracking)

* refactor: change how expiration timer works for contacts, set the expiration timer for those conversations in handling contact configs

* feat: add expiration updates via config for contacts as well

* feat: add almost all group editing cases, need to hook into the thread deletion for groups in the user groups

* feat: open group joining should work now

* feat: add groups to configs for push

* fix: handling user group updates bug fix for closed groups instead of all groups

* fix: open group sync persistence

* feat: add in activity finish if recipient no longer exists (deleted thread) from sync

* feat: support avatar removal from shared library

* feat: support thread deletion and refactoring a lot of getOrCreateThread references to go via storage or assume they are correctly set to hook into the contact and volatile creation during thread creation

* fix: database update not deleting in certain circumstances, storage persisting and removing the volatile convo info for thread deletion / creation, NTS hidden getter values in shared library

* refactor: make update listener visibility package

* refactor: update kotlin

* feat: update dependencies and support outdated config messages, refactor config factory to return null configs if new configs not supported

* feat: update shared library to use priority only, fix compile errors, fix group member sync problem

* fix: compile error

* fix: profile avatar fixes for local user now that we aren't setting local user profile key

* Revert "fix: profile avatar fixes for local user now that we aren't setting local user profile key"

This reverts commit 3f569e3403.

* refactor: let the local number update recipient details in profile manager

* fix: don't recreate thread after leaving

* fix: fix up the duplicate thread creation in the message receive handler

* fix: fix the placeholder rendering on new messages, add in extra context logging for adding contacts and preventing new thread creation on new messages of various types

* feat: add test theme for xml layout previews

* feat: add shortened hex for session IDs throughout, replace nullable getName with null in underlying contacts for individual contacts, build shared lib with release mode, remove todo, fix broken unit test

* feat: setup android unit tests for verifying storage behaviours and state of shared configs

* feat: adding dependencies to try and get android tests working, fixing bug with initial config not syncing properly

* fix: remove hilt testing, add spy on app context storage field instead, update libsession-util to fixed sodium cmake branch

* refactor: use PR version of libsession-util to test cmake build

* fix: new build on normal repo

* feat: new libsession util commit

* refactor: remove the old custom build libsodium stuff from cmake

* feat: update libsession module

* fix: add legacy config subscription to the home activity to enable showing banner at any time

* fix: pinned status for communities and groups, group last read time being set to snodeapi.now on finish joining

* fix: some open group volatile convo fix for last read timer being set. Need to investigate further

* fix: prevent blocking local number

* fix: adding in more checks for open group inbox recipients before being saved to the shared configs. Prevent sending typing indicator for blocked users

* fix: add blocked check for read receipt and updating expiring messages

* fix: another contact recipient config library call removed for non-standard IDs

* fix: another ID check

* fix: don't process thread creation for user is sender && recipient (sync message) for message request responses

* refactor: mark as read on open and use less buffer time

* fix: finally fix the darn unread count issue by

* fix: removing debug logs, adding failure error handling logs for expiry message updater, properly using the message thread ID created for the expiring messages. Process the non-thread messages properly with await in BatchMessageReceiveJob

* fix: checking the last read open to message and make sure that scroll behaviour matches expected, fix the config sync job not deleting ALL old hashes only latest

* refactor: try to add a retry logic to config sync job in case of snode failure

* build: update submodule

* fix: remove user notifications for leaving group to prevent synced device issues, don't create thread in messages for new closed groups, includei nactive groups in the deletion queries for merging group configs

* feat: use blinded message count for banner also

* refactor: remove some logging, don't use blinded conversations in the list

* fix: don't set the read flag in update notifications, some roundabout logic for first loads and scrolling to last known positions

* refactor: merge changes, re-add the group check in unapproved messages

* fix: re-poll on fail in case that was breaking anything

* fix: pinning groups and notifying list listeners in threadDb.setPinned

* feat: add in TTL extension subrequest and builder, enable extending TTLs for all latest config messages in poll as subrequest

* feat: add block to the delete all message requests, only if they're not open group inbox contacts

* refactor: disable edit text for non contacts

* refactor: let the user display name return "You" for local user

* fix: prevent NTS self create thread on user view bind

* refactor: remove populate public key cache if needed call which seems unnecessary at that point, maybe UserView refs have changed since 2020

* refactor: use just first visible instead of completely visible, merge message sender changes

* fix: prevent block of users in delete all

* fix: self sync sync message failures for default values

* feat: update libsession-util, adjust docs, update mms and sms to use message sent timestamp instead of -1 for last read in the thread

* fix: some compile issues in tests and some TODOs for things to do before merge

* fix: handle recyclerview scrolled on scroll to first unread if it's the first load

* fix: added more migration code for deleting unnecessary threads and groups, fixed a post-migration last seen issue on last item (current read is now), comment out actual network sync while testing migrations

* feat: adding a force new configs flag and logic for timestamp handling / forced configs, fix issue with handling legacy messages

* refactor: re-add the sending of configs

* fix: don't add contacts if they don't exist in the profile manager

* [wip]
fix: trying to consolidate prof pic and key properly

* feat: add logs and fix compile issue with a themes.xml entry, add removing profile picture into logic for profile manager

* fix: force has sent for local user, only prevent setting last seen for open group recipients, allow empty user pics to trigger config sync in settings

* fix: nts threads

* fix: open group avatar loop for open groups we have left

* feat: add a wrapper hash to track home diff util changes for wrapper contact recipient info, add test for dirty state in double set

* feat: add a dump in there as well

* refactor: more test code refactor

* fix: update last seen if later than current

* fix: open group threads and avatar downloads

* fix: add max size and maybe fix the non-200 sub requests for batches (for 421s in particular)

* fix: open group comparison issues potentially, have to update some more outgoing message open group flags for visibility of details etc

* Updated to the latest libSession-util

* Updated logic to delete legacy groups when kicked/left

* Added the legacy group 'joined_at' value

* Replaced incorrect character in JNI

* Fixed an issue where the group keyPair was getting encoded incorrectly

* Updated the code to ignore outdated legacy group control message changes

* Updated the code to ignore messages invalidated by the config

* [Review] Updated the poller to process config messages before standard

* Cleaned up the outdated message logic

* Fixed inverted config dropping flags

* Fixed an issue where the joining a community would read all messages

Stopped using a reversed RecyclerView in all cases (caused the unread issue)
Updated the logic to jump to the newly sent message when sending a message (to be consistent with other platforms)
Updated the logic to refresh the DB unread count when the cursor receives an update

* Updated the conversation to highlight the first unread message on open

* Fixed a couple of bugs with the highlighting

* Fixed a bug where the user profile picture wasn't downloading correctly

* feat: add all namespaces to delete all messages request and signature verification data

* fix: merge namespace hashes for signature returned and

* fix: import correct scroll to bottom

* build: update version code and name

* fix: initial contact generation fix for existing blinded contacts

* fix: initial convo generation fix for existing blinded convos (?)

* fix: conversation unread not doing a check for standard ID prefix

* fix: thread ID not being created for legacy config messages

* fix: don't treat 404 as bad snode

* fix: don't add retrieve profile job if we have one for that address

* build: update build code

* fix: reduce attempts for downloading image, invert unreachable type check

* fix: attempting to fix preventing message processing if group thread is not active for closed groups and initial contact dump only allows conversations with thread, may need further optimisations though

* feat: Added an unread marker and search result focus highlighting

* fix: empty set in appropriate places for current closed groups

* build: update build version code

* fix: fix the notifications and request at appropriate time

* refactor: remove debug logging for thread create and delete

* build: update build number

* fix: new community doesn't break persisting config if the .add request fails

* build: trying to track down broken retrieve avatar job

* feat: update to latest libsession dev

* fix: maybe fix avatar download for new messages

* fix: 404s causing snode errors and trying to retrieve avatars that have already 404'd a lot

* fix: closed group creation sets thread date to formation timestamp

* build: update version code

* build: update version code

* build: remove debuggable release build

* fix: use new permissions for external attachments

* build: update version code

* chore: remove debug logs

* fix: tests and main thread blocking db fetch for path status view

* wip: trying to track down failure to mark conversation as read in delayed group add

* wip: add more logs for initial last Read sync of communities

* wip: maybe the volatile is being updated with 0 on batch message receive?

* fix: maybe syncing read statuses are working now

* chore: remove debug logs

* build: update build number

* fix: trying to improve performance

* fix: add close to banner

* refactor: hide seed reminder in preview

* build: update build number

* fix: maybe requires update thread no matter what

* fix: message request banner shows again

* fix: android tests work again and permissions

* fix: blocked contacts click handler being overridden by something

* Revert "fix: blocked contacts click handler being overridden by something"

This reverts commit 608572fc42.

* build: update build number

* refactor: remove unused dependencies and update minor for sqlcipher

* fix: actually do insert contact, because otherwise name doesn't get set properly

* fix: maybe fix scroll to bottom issue

* build: update build number

* fix: the message time and jump to message queries are more optimized

* fix: maybe fix the last seen issues

* build: update build number

* fix: pfp broken closed groups why

* fix: add admins and members as member list instead of just members

* fix: exclude lgc without membership > 1 and inactive explicitly

* fix: submodule update

* fix: compiles with removal of iterator erase

* fix: unread indicator updates properly in ConversationActivityV2

* fix: unread notifications clear and altered if any notifications exist (prevents clearing read notifications in conversation or on home screen)

* refactor: profile pictures kinda broken

* build: update build number

* refactor: remove full hash from log

* fix: isPinned threadDB call

* refactor: use mutex in all libsession native calls, change timestamp

* refactor: add basic support for blinded v2 prefixes

---------

Co-authored-by: Morgan Pretty <morgan.t.pretty@gmail.com>
This commit is contained in:
0x330a
2023-07-14 18:27:13 +10:00
committed by GitHub
parent 96ec733517
commit ac18f1cbfe
171 changed files with 8087 additions and 1448 deletions

View File

@@ -18,6 +18,7 @@ android {
dependencies {
implementation project(":libsignal")
implementation project(":libsession-util")
implementation project(":liblazysodium")
implementation "net.java.dev.jna:jna:5.8.0@aar"
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
@@ -34,7 +35,6 @@ dependencies {
implementation 'com.annimon:stream:1.1.8'
implementation 'com.makeramen:roundedimageview:2.1.0'
implementation 'com.esotericsoftware:kryo:5.1.1'
implementation "com.google.protobuf:protobuf-java:$protobufVersion"
implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonDatabindVersion"
implementation "com.github.oxen-io.session-android-curve-25519:curve25519-java:$curve25519Version"
implementation "com.squareup.okhttp3:okhttp:$okhttpVersion"
@@ -46,10 +46,6 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.11.1'
testImplementation "org.mockito:mockito-inline:4.0.0"
testImplementation "org.mockito.kotlin:mockito-kotlin:$mockitoKotlinVersion"
testImplementation 'org.powermock:powermock-api-mockito:1.6.1'
testImplementation 'org.powermock:powermock-module-junit4:1.6.1'
testImplementation 'org.powermock:powermock-module-junit4-rule:1.6.1'
testImplementation 'org.powermock:powermock-classloading-xstream:1.6.1'
testImplementation "androidx.test:core:$testCoreVersion"
testImplementation "androidx.arch.core:core-testing:2.1.0"
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion"

View File

@@ -33,7 +33,7 @@ public class ResourceContactPhoto implements FallbackContactPhoto {
Drawable background = TextDrawable.builder().buildRound(" ", inverted ? Color.WHITE : color);
RoundedDrawable foreground = (RoundedDrawable) RoundedDrawable.fromDrawable(context.getResources().getDrawable(resourceId));
foreground.setScaleType(ImageView.ScaleType.CENTER_INSIDE);
foreground.setScaleType(ImageView.ScaleType.CENTER_CROP);
if (inverted) {
foreground.setColorFilter(color, PorterDuff.Mode.SRC_ATOP);

View File

@@ -2,12 +2,14 @@ package org.session.libsession.database
import android.content.Context
import android.net.Uri
import network.loki.messenger.libsession_util.ConfigBase
import org.session.libsession.messaging.BlindedIdMapping
import org.session.libsession.messaging.calls.CallMessageType
import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.MessageRequestResponse
@@ -30,6 +32,7 @@ import org.session.libsession.utilities.recipients.Recipient.RecipientSettings
import org.session.libsignal.crypto.ecc.ECKeyPair
import org.session.libsignal.messages.SignalServiceAttachmentPointer
import org.session.libsignal.messages.SignalServiceGroup
import network.loki.messenger.libsession_util.util.Contact as LibSessionContact
interface StorageProtocol {
@@ -38,6 +41,9 @@ interface StorageProtocol {
fun getUserX25519KeyPair(): ECKeyPair
fun getUserProfile(): Profile
fun setProfileAvatar(recipient: Recipient, profileAvatar: String?)
fun setProfilePicture(recipient: Recipient, newProfilePicture: String?, newProfileKey: ByteArray?)
fun setUserProfilePicture(newProfilePicture: String?, newProfileKey: ByteArray?)
fun clearUserPic()
// Signal
fun getOrGenerateRegistrationID(): Int
@@ -50,8 +56,10 @@ interface StorageProtocol {
fun getMessageSendJob(messageSendJobID: String): MessageSendJob?
fun getMessageReceiveJob(messageReceiveJobID: String): Job?
fun getGroupAvatarDownloadJob(server: String, room: String, imageId: String?): Job?
fun getConfigSyncJob(destination: Destination): Job?
fun resumeMessageSendJobIfNeeded(messageSendJobID: String)
fun isJobCanceled(job: Job): Boolean
fun cancelPendingMessageSendJobs(threadID: Long)
// Authorization
fun getAuthToken(room: String, server: String): String?
@@ -67,7 +75,7 @@ interface StorageProtocol {
fun updateOpenGroup(openGroup: OpenGroup)
fun getOpenGroup(threadId: Long): OpenGroup?
fun addOpenGroup(urlAsString: String): OpenGroupApi.RoomInfo?
fun onOpenGroupAdded(server: String)
fun onOpenGroupAdded(server: String, room: String)
fun hasBackgroundGroupAddJob(groupJoinUrl: String): Boolean
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long, threadID: Long, isSms: Boolean)
fun getOpenGroup(room: String, server: String): OpenGroup?
@@ -119,6 +127,8 @@ interface StorageProtocol {
// Closed Groups
fun getGroup(groupID: String): GroupRecord?
fun createGroup(groupID: String, title: String?, members: List<Address>, avatar: SignalServiceAttachmentPointer?, relay: String?, admins: List<Address>, formationTimestamp: Long)
fun createInitialConfigGroup(groupPublicKey: String, name: String, members: Map<String, Boolean>, formationTimestamp: Long, encryptionKeyPair: ECKeyPair)
fun updateGroupConfig(groupPublicKey: String)
fun isGroupActive(groupPublicKey: String): Boolean
fun setActive(groupID: String, value: Boolean)
fun getZombieMembers(groupID: String): Set<String>
@@ -129,7 +139,7 @@ interface StorageProtocol {
fun getAllActiveClosedGroupPublicKeys(): Set<String>
fun addClosedGroupPublicKey(groupPublicKey: String)
fun removeClosedGroupPublicKey(groupPublicKey: String)
fun addClosedGroupEncryptionKeyPair(encryptionKeyPair: ECKeyPair, groupPublicKey: String)
fun addClosedGroupEncryptionKeyPair(encryptionKeyPair: ECKeyPair, groupPublicKey: String, timestamp: Long)
fun removeAllClosedGroupEncryptionKeyPairs(groupPublicKey: String)
fun insertIncomingInfoMessage(context: Context, senderPublicKey: String, groupID: String, type: SignalServiceGroup.Type,
name: String, members: Collection<String>, admins: Collection<String>, sentTimestamp: Long)
@@ -140,18 +150,20 @@ interface StorageProtocol {
fun getLatestClosedGroupEncryptionKeyPair(groupPublicKey: String): ECKeyPair?
fun updateFormationTimestamp(groupID: String, formationTimestamp: Long)
fun updateTimestampUpdated(groupID: String, updatedTimestamp: Long)
fun setExpirationTimer(groupID: String, duration: Int)
fun setExpirationTimer(address: String, duration: Int)
// Groups
fun getAllGroups(): List<GroupRecord>
fun getAllGroups(includeInactive: Boolean): List<GroupRecord>
// Settings
fun setProfileSharing(address: Address, value: Boolean)
// Thread
fun getOrCreateThreadIdFor(address: Address): Long
fun getOrCreateThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?): Long
fun getThreadIdFor(publicKey: String, groupPublicKey: String?, openGroupID: String?, createThread: Boolean): Long?
fun getThreadId(publicKeyOrOpenGroupID: String): Long?
fun getThreadId(openGroup: OpenGroup): Long?
fun getThreadId(address: Address): Long?
fun getThreadId(recipient: Recipient): Long?
fun getThreadIdForMms(mmsId: Long): Long
@@ -159,7 +171,10 @@ interface StorageProtocol {
fun trimThread(threadID: Long, threadLimit: Int)
fun trimThreadBefore(threadID: Long, timestamp: Long)
fun getMessageCount(threadID: Long): Long
fun deleteConversation(threadId: Long)
fun setPinned(threadID: Long, isPinned: Boolean)
fun isPinned(threadID: Long): Boolean
fun deleteConversation(threadID: Long)
fun setThreadDate(threadId: Long, newDate: Long)
// Contacts
fun getContactWithSessionID(sessionID: String): Contact?
@@ -167,6 +182,7 @@ interface StorageProtocol {
fun setContact(contact: Contact)
fun getRecipientForThread(threadId: Long): Recipient?
fun getRecipientSettings(address: Address): RecipientSettings?
fun addLibSessionContacts(contacts: List<LibSessionContact>)
fun addContacts(contacts: List<ConfigurationMessage.Contact>)
// Attachments
@@ -177,13 +193,14 @@ interface StorageProtocol {
/**
* Returns the ID of the `TSIncomingMessage` that was constructed.
*/
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>, runIncrement: Boolean, runThreadUpdate: Boolean): Long?
fun markConversationAsRead(threadId: Long, updateLastSeen: Boolean)
fun incrementUnread(threadId: Long, amount: Int, unreadMentionAmount: Int)
fun persist(message: VisibleMessage, quotes: QuoteModel?, linkPreview: List<LinkPreview?>, groupPublicKey: String?, openGroupID: String?, attachments: List<Attachment>, runThreadUpdate: Boolean): Long?
fun markConversationAsRead(threadId: Long, lastSeenTime: Long, force: Boolean = false)
fun getLastSeen(threadId: Long): Long
fun updateThread(threadId: Long, unarchive: Boolean)
fun insertDataExtractionNotificationMessage(senderPublicKey: String, message: DataExtractionNotificationInfoMessage, sentTimestamp: Long)
fun insertMessageRequestResponse(response: MessageRequestResponse)
fun setRecipientApproved(recipient: Recipient, approved: Boolean)
fun getRecipientApproved(address: Address): Boolean
fun setRecipientApprovedMe(recipient: Recipient, approvedMe: Boolean)
fun insertCallMessage(senderPublicKey: String, callMessageType: CallMessageType, sentTimestamp: Long)
fun conversationHasOutgoing(userPublicKey: String): Boolean
@@ -203,6 +220,12 @@ interface StorageProtocol {
fun removeReaction(emoji: String, messageTimestamp: Long, author: String, notifyUnread: Boolean)
fun updateReactionIfNeeded(message: Message, sender: String, openGroupSentTimestamp: Long)
fun deleteReactions(messageId: Long, mms: Boolean)
fun unblock(toUnblock: Iterable<Recipient>)
fun setBlocked(recipients: Iterable<Recipient>, isBlocked: Boolean, fromConfigUpdate: Boolean = false)
fun setRecipientHash(recipient: Recipient, recipientHash: String?)
fun blockedContacts(): List<Recipient>
// Shared configs
fun notifyConfigUpdates(forConfigObject: ConfigBase)
fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean
fun canPerformConfigChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean
}

View File

@@ -4,12 +4,14 @@ import android.content.Context
import com.goterl.lazysodium.utils.KeyPair
import org.session.libsession.database.MessageDataProvider
import org.session.libsession.database.StorageProtocol
import org.session.libsession.utilities.ConfigFactoryProtocol
class MessagingModuleConfiguration(
val context: Context,
val storage: StorageProtocol,
val messageDataProvider: MessageDataProvider,
val getUserED25519KeyPair: ()-> KeyPair?
val getUserED25519KeyPair: () -> KeyPair?,
val configFactory: ConfigFactoryProtocol
) {
companion object {

View File

@@ -42,7 +42,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val databaseMessageID: Long)
private val TS_INCOMING_MESSAGE_ID_KEY = "tsIncoming_message_id"
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val threadID = storage.getThreadIdForMms(databaseMessageID)

View File

@@ -16,7 +16,11 @@ import org.session.libsession.utilities.DecodedAudio
import org.session.libsession.utilities.InputStreamMediaDataSource
import org.session.libsession.utilities.UploadResult
import org.session.libsignal.messages.SignalServiceAttachmentStream
import org.session.libsignal.streams.*
import org.session.libsignal.streams.AttachmentCipherOutputStream
import org.session.libsignal.streams.AttachmentCipherOutputStreamFactory
import org.session.libsignal.streams.DigestingRequestBody
import org.session.libsignal.streams.PaddingInputStream
import org.session.libsignal.streams.PlaintextOutputStreamFactory
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.PushAttachmentData
import org.session.libsignal.utilities.Util
@@ -45,7 +49,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
private val MESSAGE_SEND_JOB_ID_KEY = "message_send_job_id"
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
try {
val storage = MessagingModuleConfiguration.shared.storage
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider

View File

@@ -3,9 +3,7 @@ package org.session.libsession.messaging.jobs
import okhttp3.HttpUrl
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.open_groups.OpenGroup
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.OpenGroupUrlParser
import org.session.libsignal.utilities.Log
@@ -29,7 +27,7 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
return "$server.$room"
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
try {
val openGroup = OpenGroupUrlParser.parseUrl(joinUrl)
val storage = MessagingModuleConfiguration.shared.storage
@@ -40,8 +38,7 @@ class BackgroundGroupAddJob(val joinUrl: String): Job {
return
}
storage.addOpenGroup(openGroup.joinUrl())
Log.d(KEY, "onOpenGroupAdded(${openGroup.server})")
storage.onOpenGroupAdded(openGroup.server)
storage.onOpenGroupAdded(openGroup.server, openGroup.room)
} catch (e: Exception) {
Log.e("OpenGroupDispatcher", "Failed to add group because",e)
delegate?.handleJobFailed(this, dispatcherName, e)

View File

@@ -7,15 +7,26 @@ import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.task
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.CallMessage
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.DataExtractionNotification
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.MessageRequestResponse
import org.session.libsession.messaging.messages.control.ReadReceipt
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.messages.control.TypingIndicator
import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.ParsedMessage
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.open_groups.OpenGroupApi
import org.session.libsession.messaging.sending_receiving.*
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.sending_receiving.handleOpenGroupReactions
import org.session.libsession.messaging.sending_receiving.handleUnsendRequest
import org.session.libsession.messaging.sending_receiving.handleVisibleMessage
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.messaging.utilities.SessionId
import org.session.libsession.messaging.utilities.SodiumUtilities
@@ -49,6 +60,9 @@ class BatchMessageReceiveJob(
const val BATCH_DEFAULT_NUMBER = 512
// used for processing messages that don't have a thread and shouldn't create one
const val NO_THREAD_MAPPING = -1L
// Keys used for database storage
private val NUM_MESSAGES_KEY = "numMessages"
private val DATA_KEY = "data"
@@ -57,16 +71,27 @@ class BatchMessageReceiveJob(
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
private fun getThreadId(message: Message, storage: StorageProtocol): Long {
val senderOrSync = when (message) {
is VisibleMessage -> message.syncTarget ?: message.sender!!
is ExpirationTimerUpdate -> message.syncTarget ?: message.sender!!
else -> message.sender!!
private fun shouldCreateThread(parsedMessage: ParsedMessage): Boolean {
val message = parsedMessage.message
if (message is VisibleMessage) return true
else { // message is control message otherwise
return when(message) {
is SharedConfigurationMessage -> false
is ClosedGroupControlMessage -> false // message.kind is ClosedGroupControlMessage.Kind.New && !message.isSenderSelf
is DataExtractionNotification -> false
is MessageRequestResponse -> false
is ExpirationTimerUpdate -> false
is ConfigurationMessage -> false
is TypingIndicator -> false
is UnsendRequest -> false
is ReadReceipt -> false
is CallMessage -> false // TODO: maybe
else -> false // shouldn't happen, or I guess would be Visible
}
}
return storage.getOrCreateThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID)
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
executeAsync(dispatcherName).get()
}
@@ -77,15 +102,16 @@ class BatchMessageReceiveJob(
val context = MessagingModuleConfiguration.shared.context
val localUserPublicKey = storage.getUserPublicKey()
val serverPublicKey = openGroupID?.let { storage.getOpenGroupPublicKey(it.split(".").dropLast(1).joinToString(".")) }
val currentClosedGroups = storage.getAllActiveClosedGroupPublicKeys()
// parse and collect IDs
messages.forEach { messageParameters ->
val (data, serverHash, openGroupMessageServerID) = messageParameters
try {
val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID, openGroupPublicKey = serverPublicKey)
val (message, proto) = MessageReceiver.parse(data, openGroupMessageServerID, openGroupPublicKey = serverPublicKey, currentClosedGroups = currentClosedGroups)
message.serverHash = serverHash
val threadID = getThreadId(message, storage)
val parsedParams = ParsedMessage(messageParameters, message, proto)
val threadID = Message.getThreadId(message, openGroupID, storage, shouldCreateThread(parsedParams)) ?: NO_THREAD_MAPPING
if (!threadMap.containsKey(threadID)) {
threadMap[threadID] = mutableListOf(parsedParams)
} else {
@@ -115,77 +141,101 @@ class BatchMessageReceiveJob(
// iterate over threads and persist them (persistence is the longest constant in the batch process operation)
runBlocking(Dispatchers.IO) {
val deferredThreadMap = threadMap.entries.map { (threadId, messages) ->
async {
// The LinkedHashMap should preserve insertion order
val messageIds = linkedMapOf<Long, Pair<Boolean, Boolean>>()
messages.forEach { (parameters, message, proto) ->
try {
when (message) {
is VisibleMessage -> {
val messageId = MessageReceiver.handleVisibleMessage(message, proto, openGroupID,
runIncrement = false,
runThreadUpdate = false,
runProfileUpdate = true
)
if (messageId != null && message.reaction == null) {
val isUserBlindedSender = message.sender == serverPublicKey?.let { SodiumUtilities.blindedKeyPair(it, MessagingModuleConfiguration.shared.getUserED25519KeyPair()!!) }?.let { SessionId(
IdPrefix.BLINDED, it.publicKey.asBytes).hexString }
messageIds[messageId] = Pair(
(message.sender == localUserPublicKey || isUserBlindedSender),
message.hasMention
fun processMessages(threadId: Long, messages: List<ParsedMessage>) = async {
// The LinkedHashMap should preserve insertion order
val messageIds = linkedMapOf<Long, Pair<Boolean, Boolean>>()
val myLastSeen = storage.getLastSeen(threadId)
var newLastSeen = if (myLastSeen == -1L) 0 else myLastSeen
messages.forEach { (parameters, message, proto) ->
try {
when (message) {
is VisibleMessage -> {
val isUserBlindedSender =
message.sender == serverPublicKey?.let {
SodiumUtilities.blindedKeyPair(
it,
MessagingModuleConfiguration.shared.getUserED25519KeyPair()!!
)
}?.let {
SessionId(
IdPrefix.BLINDED, it.publicKey.asBytes
).hexString
}
parameters.openGroupMessageServerID?.let {
MessageReceiver.handleOpenGroupReactions(threadId, it, parameters.reactions)
val sentTimestamp = message.sentTimestamp!!
if (message.sender == localUserPublicKey || isUserBlindedSender) {
if (sentTimestamp > newLastSeen) {
newLastSeen =
sentTimestamp // use sent timestamp here since that is technically the last one we have
}
}
val messageId = MessageReceiver.handleVisibleMessage(
message, proto, openGroupID, threadId,
runThreadUpdate = false,
runProfileUpdate = true
)
is UnsendRequest -> {
val deletedMessageId = MessageReceiver.handleUnsendRequest(message)
// If we removed a message then ensure it isn't in the 'messageIds'
if (deletedMessageId != null) {
messageIds.remove(deletedMessageId)
}
if (messageId != null && message.reaction == null) {
messageIds[messageId] = Pair(
(message.sender == localUserPublicKey || isUserBlindedSender),
message.hasMention
)
}
parameters.openGroupMessageServerID?.let {
MessageReceiver.handleOpenGroupReactions(
threadId,
it,
parameters.reactions
)
}
}
else -> MessageReceiver.handle(message, proto, openGroupID)
}
} catch (e: Exception) {
Log.e(TAG, "Couldn't process message (id: $id)", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e(TAG, "Message failed permanently (id: $id)", e)
} else {
Log.e(TAG, "Message failed (id: $id)", e)
failures += parameters
is UnsendRequest -> {
val deletedMessageId =
MessageReceiver.handleUnsendRequest(message)
// If we removed a message then ensure it isn't in the 'messageIds'
if (deletedMessageId != null) {
messageIds.remove(deletedMessageId)
}
}
else -> MessageReceiver.handle(message, proto, threadId, openGroupID)
}
} catch (e: Exception) {
Log.e(TAG, "Couldn't process message (id: $id)", e)
if (e is MessageReceiver.Error && !e.isRetryable) {
Log.e(TAG, "Message failed permanently (id: $id)", e)
} else {
Log.e(TAG, "Message failed (id: $id)", e)
failures += parameters
}
}
// increment unreads, notify, and update thread
val unreadFromMine = messageIds.map { it.value.first }.indexOfLast { it }
var trueUnreadCount = messageIds.filter { !it.value.first }.size
var trueUnreadMentionCount = messageIds.filter { !it.value.first && it.value.second }.size
if (unreadFromMine >= 0) {
storage.markConversationAsRead(threadId, false)
val trueUnreadIds = messageIds.keys.toList().subList(unreadFromMine + 1, messageIds.keys.count())
trueUnreadCount = trueUnreadIds.size
trueUnreadMentionCount = messageIds
.filter { trueUnreadIds.contains(it.key) && !it.value.first && it.value.second }
.size
}
if (trueUnreadCount > 0) {
storage.incrementUnread(threadId, trueUnreadCount, trueUnreadMentionCount)
}
storage.updateThread(threadId, true)
SSKEnvironment.shared.notificationManager.updateNotification(context, threadId)
}
// increment unreads, notify, and update thread
// last seen will be the current last seen if not changed (re-computes the read counts for thread record)
// might have been updated from a different thread at this point
val currentLastSeen = storage.getLastSeen(threadId).let { if (it == -1L) 0 else it }
if (currentLastSeen > newLastSeen) {
newLastSeen = currentLastSeen
}
if (newLastSeen > 0 || currentLastSeen == 0L) {
storage.markConversationAsRead(threadId, newLastSeen, force = true)
}
storage.updateThread(threadId, true)
SSKEnvironment.shared.notificationManager.updateNotification(context, threadId)
}
val withoutDefault = threadMap.entries.filter { it.key != NO_THREAD_MAPPING }
val noThreadMessages = threadMap[NO_THREAD_MAPPING] ?: listOf()
val deferredThreadMap = withoutDefault.map { (threadId, messages) ->
processMessages(threadId, messages)
}
// await all thread processing
deferredThreadMap.awaitAll()
if (noThreadMessages.isNotEmpty()) {
processMessages(NO_THREAD_MAPPING, noThreadMessages).await()
}
}
if (failures.isEmpty()) {
handleSuccess(dispatcherName)

View File

@@ -0,0 +1,206 @@
package org.session.libsession.messaging.jobs
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.ConfigBase.Companion.protoKindFor
import nl.komponents.kovenant.functional.bind
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Destination
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.utilities.Log
import java.util.concurrent.atomic.AtomicBoolean
// only contact (self) and closed group destinations will be supported
data class ConfigurationSyncJob(val destination: Destination): Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 10
val shouldRunAgain = AtomicBoolean(false)
override suspend fun execute(dispatcherName: String) {
val storage = MessagingModuleConfiguration.shared.storage
val forcedConfig = TextSecurePreferences.hasForcedNewConfig(MessagingModuleConfiguration.shared.context)
val currentTime = SnodeAPI.nowWithOffset
val userEdKeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair()
val userPublicKey = storage.getUserPublicKey()
val delegate = delegate
if (destination is Destination.ClosedGroup // TODO: closed group configs will be handled in closed group feature
// if we haven't enabled the new configs don't run
|| !ConfigBase.isNewConfigEnabled(forcedConfig, currentTime)
// if we don't have a user ed key pair for signing updates
|| userEdKeyPair == null
// this will be useful to not handle null delegate cases
|| delegate == null
// check our local identity key exists
|| userPublicKey.isNullOrEmpty()
// don't allow pushing configs for non-local user
|| (destination is Destination.Contact && destination.publicKey != userPublicKey)
) {
Log.w(TAG, "No need to run config sync job, TODO")
return delegate?.handleJobSucceeded(this, dispatcherName) ?: Unit
}
// configFactory singleton instance will come in handy for modifying hashes and fetching configs for namespace etc
val configFactory = MessagingModuleConfiguration.shared.configFactory
// get latest states, filter out configs that don't need push
val configsRequiringPush = configFactory.getUserConfigs().filter { config -> config.needsPush() }
// don't run anything if we don't need to push anything
if (configsRequiringPush.isEmpty()) return delegate.handleJobSucceeded(this, dispatcherName)
// need to get the current hashes before we call `push()`
val toDeleteHashes = mutableListOf<String>()
// allow null results here so the list index matches configsRequiringPush
val sentTimestamp: Long = SnodeAPI.nowWithOffset
val batchObjects: List<Pair<SharedConfigurationMessage, SnodeAPI.SnodeBatchRequestInfo>?> = configsRequiringPush.map { config ->
val (data, seqNo, obsoleteHashes) = config.push()
toDeleteHashes += obsoleteHashes
SharedConfigurationMessage(config.protoKindFor(), data, seqNo) to config
}.map { (message, config) ->
// return a list of batch request objects
val snodeMessage = MessageSender.buildWrappedMessageToSnode(destination, message, true)
val authenticated = SnodeAPI.buildAuthenticatedStoreBatchInfo(
destination.destinationPublicKey(),
config.configNamespace(),
snodeMessage
) ?: return@map null // this entry will be null otherwise
message to authenticated // to keep track of seqNo for calling confirmPushed later
}
val toDeleteRequest = toDeleteHashes.let { toDeleteFromAllNamespaces ->
if (toDeleteFromAllNamespaces.isEmpty()) null
else SnodeAPI.buildAuthenticatedDeleteBatchInfo(destination.destinationPublicKey(), toDeleteFromAllNamespaces)
}
if (batchObjects.any { it == null }) {
// stop running here, something like a signing error occurred
return delegate.handleJobFailedPermanently(this, dispatcherName, NullPointerException("One or more requests had a null batch request info"))
}
val allRequests = mutableListOf<SnodeAPI.SnodeBatchRequestInfo>()
allRequests += batchObjects.requireNoNulls().map { (_, request) -> request }
// add in the deletion if we have any hashes
if (toDeleteRequest != null) {
allRequests += toDeleteRequest
Log.d(TAG, "Including delete request for current hashes")
}
val batchResponse = SnodeAPI.getSingleTargetSnode(destination.destinationPublicKey()).bind { snode ->
SnodeAPI.getRawBatchResponse(
snode,
destination.destinationPublicKey(),
allRequests,
sequence = true
)
}
try {
val rawResponses = batchResponse.get()
@Suppress("UNCHECKED_CAST")
val responseList = (rawResponses["results"] as List<RawResponse>)
// we are always adding in deletions at the end
val deletionResponse = if (toDeleteRequest != null && responseList.isNotEmpty()) responseList.last() else null
val deletedHashes = deletionResponse?.let {
@Suppress("UNCHECKED_CAST")
// get the sub-request body
(deletionResponse["body"] as? RawResponse)?.let { body ->
// get the swarm dict
body["swarm"] as? RawResponse
}?.mapValues { (_, swarmDict) ->
// get the deleted values from dict
((swarmDict as? RawResponse)?.get("deleted") as? List<String>)?.toSet() ?: emptySet()
}?.values?.reduce { acc, strings ->
// create an intersection of all deleted hashes (common between all swarm nodes)
acc intersect strings
}
} ?: emptySet()
// at this point responseList index should line up with configsRequiringPush index
configsRequiringPush.forEachIndexed { index, config ->
val (toPushMessage, _) = batchObjects[index]!!
val response = responseList[index]
val responseBody = response["body"] as? RawResponse
val insertHash = responseBody?.get("hash") as? String ?: run {
Log.w(TAG, "No hash returned for the configuration in namespace ${config.configNamespace()}")
return@forEachIndexed
}
Log.d(TAG, "Hash ${insertHash.take(4)} returned from store request for new config")
// confirm pushed seqno
val thisSeqNo = toPushMessage.seqNo
config.confirmPushed(thisSeqNo, insertHash)
Log.d(TAG, "Successfully removed the deleted hashes from ${config.javaClass.simpleName}")
// dump and write config after successful
if (config.needsDump()) { // usually this will be true?
configFactory.persist(config, toPushMessage.sentTimestamp ?: sentTimestamp)
}
}
} catch (e: Exception) {
Log.e(TAG, "Error performing batch request", e)
return delegate.handleJobFailed(this, dispatcherName, e)
}
delegate.handleJobSucceeded(this, dispatcherName)
if (shouldRunAgain.get() && storage.getConfigSyncJob(destination) == null) {
// reschedule if something has updated since we started this job
JobQueue.shared.add(ConfigurationSyncJob(destination))
}
}
fun Destination.destinationPublicKey(): String = when (this) {
is Destination.Contact -> publicKey
is Destination.ClosedGroup -> groupPublicKey
else -> throw NullPointerException("Not public key for this destination")
}
override fun serialize(): Data {
val (type, address) = when (destination) {
is Destination.Contact -> CONTACT_TYPE to destination.publicKey
is Destination.ClosedGroup -> GROUP_TYPE to destination.groupPublicKey
else -> return Data.EMPTY
}
return Data.Builder()
.putInt(DESTINATION_TYPE_KEY, type)
.putString(DESTINATION_ADDRESS_KEY, address)
.build()
}
override fun getFactoryKey(): String = KEY
companion object {
const val TAG = "ConfigSyncJob"
const val KEY = "ConfigSyncJob"
// Keys used for DB storage
const val DESTINATION_ADDRESS_KEY = "destinationAddress"
const val DESTINATION_TYPE_KEY = "destinationType"
// type mappings
const val CONTACT_TYPE = 1
const val GROUP_TYPE = 2
}
class Factory: Job.Factory<ConfigurationSyncJob> {
override fun create(data: Data): ConfigurationSyncJob? {
if (!data.hasInt(DESTINATION_TYPE_KEY) || !data.hasString(DESTINATION_ADDRESS_KEY)) return null
val address = data.getString(DESTINATION_ADDRESS_KEY)
val destination = when (data.getInt(DESTINATION_TYPE_KEY)) {
CONTACT_TYPE -> Destination.Contact(address)
GROUP_TYPE -> Destination.ClosedGroup(address)
else -> return null
}
return ConfigurationSyncJob(destination)
}
}
}

View File

@@ -13,14 +13,18 @@ class GroupAvatarDownloadJob(val server: String, val room: String, val imageId:
override var failureCount: Int = 0
override val maxFailureCount: Int = 10
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
if (imageId == null) {
delegate?.handleJobFailedPermanently(this, dispatcherName, Exception("GroupAvatarDownloadJob now requires imageId"))
return
}
val storage = MessagingModuleConfiguration.shared.storage
val storedImageId = storage.getOpenGroup(room, server)?.imageId
val openGroup = storage.getOpenGroup(room, server)
if (openGroup == null || storage.getThreadId(openGroup) == null) {
delegate?.handleJobFailedPermanently(this, dispatcherName, Exception("GroupAvatarDownloadJob openGroup is null"))
return
}
val storedImageId = openGroup.imageId
if (storedImageId == null || storedImageId != imageId) {
delegate?.handleJobFailedPermanently(this, dispatcherName, Exception("GroupAvatarDownloadJob imageId does not match the OpenGroup"))

View File

@@ -17,7 +17,7 @@ interface Job {
internal const val MAX_BUFFER_SIZE = 1_000_000 // bytes
}
fun execute(dispatcherName: String)
suspend fun execute(dispatcherName: String)
fun serialize(): Data

View File

@@ -94,7 +94,7 @@ class JobQueue : JobDelegate {
}
}
private fun Job.process(dispatcherName: String) {
private suspend fun Job.process(dispatcherName: String) {
Log.d(dispatcherName,"processJob: ${javaClass.simpleName} (id: $id)")
delegate = this@JobQueue
@@ -122,7 +122,7 @@ class JobQueue : JobDelegate {
while (isActive) {
when (val job = queue.receive()) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob -> {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob, is ConfigurationSyncJob -> {
txQueue.send(job)
}
is RetrieveProfileAvatarJob,
@@ -226,6 +226,7 @@ class JobQueue : JobDelegate {
BackgroundGroupAddJob.KEY,
OpenGroupDeleteJob.KEY,
RetrieveProfileAvatarJob.KEY,
ConfigurationSyncJob.KEY,
)
allJobTypes.forEach { type ->
resumePendingJobs(type)

View File

@@ -3,6 +3,7 @@ package org.session.libsession.messaging.jobs
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.messaging.sending_receiving.handle
import org.session.libsession.messaging.utilities.Data
@@ -25,20 +26,22 @@ class MessageReceiveJob(val data: ByteArray, val serverHash: String? = null, val
private val OPEN_GROUP_ID_KEY = "open_group_id"
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
executeAsync(dispatcherName).get()
}
fun executeAsync(dispatcherName: String): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
try {
val isRetry: Boolean = failureCount != 0
val storage = MessagingModuleConfiguration.shared.storage
val serverPublicKey = openGroupID?.let {
MessagingModuleConfiguration.shared.storage.getOpenGroupPublicKey(it.split(".").dropLast(1).joinToString("."))
storage.getOpenGroupPublicKey(it.split(".").dropLast(1).joinToString("."))
}
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey)
val currentClosedGroups = storage.getAllActiveClosedGroupPublicKeys()
val (message, proto) = MessageReceiver.parse(this.data, this.openGroupMessageServerID, openGroupPublicKey = serverPublicKey, currentClosedGroups = currentClosedGroups)
val threadId = Message.getThreadId(message, this.openGroupID, storage, false)
message.serverHash = serverHash
MessageReceiver.handle(message, proto, this.openGroupID)
MessageReceiver.handle(message, proto, threadId ?: -1, this.openGroupID)
this.handleSuccess(dispatcherName)
deferred.resolve(Unit)
} catch (e: Exception) {

View File

@@ -10,7 +10,6 @@ import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Log
@@ -33,7 +32,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
private val DESTINATION_KEY = "destination"
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
val messageDataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val message = message as? VisibleMessage
val storage = MessagingModuleConfiguration.shared.storage
@@ -65,7 +64,8 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
return
} // Wait for all attachments to upload before continuing
}
val promise = MessageSender.send(this.message, this.destination).success {
val isSync = destination is Destination.Contact && destination.publicKey == sender
val promise = MessageSender.send(this.message, this.destination, isSync).success {
this.handleSuccess(dispatcherName)
}.fail { exception ->
var logStacktrace = true

View File

@@ -8,15 +8,13 @@ import okhttp3.MediaType
import okhttp3.Request
import okhttp3.RequestBody
import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE
import org.session.libsession.messaging.sending_receiving.notifications.PushNotificationAPI
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.OnionRequestAPI
import org.session.libsession.snode.SnodeMessage
import org.session.libsession.snode.Version
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.retryIfNeeded
class NotifyPNServerJob(val message: SnodeMessage) : Job {
@@ -32,7 +30,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
private val MESSAGE_KEY = "message"
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
val server = PushNotificationAPI.server
val parameters = mapOf( "data" to message.data, "send_to" to message.recipient )
val url = "${server}/notify"

View File

@@ -19,7 +19,7 @@ class OpenGroupDeleteJob(private val messageServerIds: LongArray, private val th
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider
val numberToDelete = messageServerIds.size
Log.d(TAG, "Deleting $numberToDelete messages")

View File

@@ -1,16 +1,14 @@
package org.session.libsession.messaging.jobs
import android.text.TextUtils
import org.session.libsession.avatars.AvatarHelper
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.DownloadUtilities.downloadFile
import org.session.libsession.utilities.TextSecurePreferences.Companion.setProfileAvatarId
import org.session.libsession.utilities.TextSecurePreferences.Companion.setProfilePictureURL
import org.session.libsession.utilities.Util.copy
import org.session.libsession.utilities.Util.equals
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.TextSecurePreferences.Companion.setProfilePictureURL
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.streams.ProfileCipherInputStream
import org.session.libsignal.utilities.Log
@@ -19,12 +17,13 @@ import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.InputStream
import java.security.SecureRandom
import java.util.concurrent.ConcurrentSkipListSet
class RetrieveProfileAvatarJob(private val profileAvatar: String?, private val recipientAddress: Address): Job {
class RetrieveProfileAvatarJob(private val profileAvatar: String?, val recipientAddress: Address): Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 0
override val maxFailureCount: Int = 3
companion object {
val TAG = RetrieveProfileAvatarJob::class.simpleName
@@ -33,20 +32,30 @@ class RetrieveProfileAvatarJob(private val profileAvatar: String?, private val r
// Keys used for database storage
private const val PROFILE_AVATAR_KEY = "profileAvatar"
private const val RECEIPIENT_ADDRESS_KEY = "recipient"
val errorUrls = ConcurrentSkipListSet<String>()
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
val delegate = delegate ?: return
if (profileAvatar in errorUrls) return delegate.handleJobFailed(this, dispatcherName, Exception("Profile URL 404'd this app instance"))
val context = MessagingModuleConfiguration.shared.context
val storage = MessagingModuleConfiguration.shared.storage
val recipient = Recipient.from(context, recipientAddress, true)
val profileKey = recipient.resolve().profileKey
if (profileKey == null || (profileKey.size != 32 && profileKey.size != 16)) {
Log.w(TAG, "Recipient profile key is gone!")
return
return delegate.handleJobFailedPermanently(this, dispatcherName, Exception("Recipient profile key is gone!"))
}
if (AvatarHelper.avatarFileExists(context, recipient.resolve().address) && equals(profileAvatar, recipient.resolve().profileAvatar)) {
// Commit '78d1e9d' (fix: open group threads and avatar downloads) had this commented out so
// it's now limited to just the current user case
if (
recipient.isLocalNumber &&
AvatarHelper.avatarFileExists(context, recipient.resolve().address) &&
equals(profileAvatar, recipient.resolve().profileAvatar)
) {
Log.w(TAG, "Already retrieved profile avatar: $profileAvatar")
return
}
@@ -72,16 +81,23 @@ class RetrieveProfileAvatarJob(private val profileAvatar: String?, private val r
val decryptDestination = File.createTempFile("avatar", ".jpg", context.cacheDir)
copy(avatarStream, FileOutputStream(decryptDestination))
decryptDestination.renameTo(AvatarHelper.getAvatarFile(context, recipient.address))
if (recipient.isLocalNumber) {
setProfileAvatarId(context, SecureRandom().nextInt())
setProfilePictureURL(context, profileAvatar)
}
storage.setProfileAvatar(recipient, profileAvatar)
} catch (e: Exception) {
Log.e("Loki", "Failed to download profile avatar", e)
if (failureCount + 1 >= maxFailureCount) {
errorUrls += profileAvatar
}
return delegate.handleJobFailed(this, dispatcherName, e)
} finally {
downloadDestination.delete()
}
if (recipient.isLocalNumber) {
setProfileAvatarId(context, SecureRandom().nextInt())
setProfilePictureURL(context, profileAvatar)
}
storage.setProfileAvatar(recipient, profileAvatar)
return delegate.handleJobSucceeded(this, dispatcherName)
}
override fun serialize(): Data {

View File

@@ -16,6 +16,7 @@ class SessionJobManagerFactories {
GroupAvatarDownloadJob.KEY to GroupAvatarDownloadJob.Factory(),
BackgroundGroupAddJob.KEY to BackgroundGroupAddJob.Factory(),
OpenGroupDeleteJob.KEY to OpenGroupDeleteJob.Factory(),
ConfigurationSyncJob.KEY to ConfigurationSyncJob.Factory()
)
}
}

View File

@@ -20,7 +20,7 @@ class TrimThreadJob(val threadId: Long, val openGroupId: String?) : Job {
const val THREAD_LENGTH_TRIGGER_SIZE = 2000
}
override fun execute(dispatcherName: String) {
override suspend fun execute(dispatcherName: String) {
val context = MessagingModuleConfiguration.shared.context
val trimmingEnabled = TextSecurePreferences.isThreadLengthTrimmingEnabled(context)
val storage = MessagingModuleConfiguration.shared.storage

View File

@@ -1,6 +1,9 @@
package org.session.libsession.messaging.messages
import com.google.protobuf.ByteString
import org.session.libsession.database.StorageProtocol
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.protos.SignalServiceProtos
@@ -11,6 +14,7 @@ abstract class Message {
var receivedTimestamp: Long? = null
var recipient: String? = null
var sender: String? = null
var isSenderSelf: Boolean = false
var groupPublicKey: String? = null
var openGroupServerMessageID: Long? = null
var serverHash: String? = null
@@ -18,6 +22,17 @@ abstract class Message {
open val ttl: Long = 14 * 24 * 60 * 60 * 1000
open val isSelfSendValid: Boolean = false
companion object {
fun getThreadId(message: Message, openGroupID: String?, storage: StorageProtocol, shouldCreateThread: Boolean): Long? {
val senderOrSync = when (message) {
is VisibleMessage -> message.syncTarget ?: message.sender!!
is ExpirationTimerUpdate -> message.syncTarget ?: message.sender!!
else -> message.sender!!
}
return storage.getThreadIdFor(senderOrSync, message.groupPublicKey, openGroupID, createThread = shouldCreateThread)
}
}
open fun isValid(): Boolean {
val sentTimestamp = sentTimestamp
if (sentTimestamp != null && sentTimestamp <= 0) { return false }

View File

@@ -122,9 +122,9 @@ class ConfigurationMessage(var closedGroups: List<ClosedGroup>, var openGroups:
val displayName = TextSecurePreferences.getProfileName(context) ?: return null
val profilePicture = TextSecurePreferences.getProfilePictureURL(context)
val profileKey = ProfileKeyUtil.getProfileKey(context)
val groups = storage.getAllGroups()
val groups = storage.getAllGroups(includeInactive = false)
for (group in groups) {
if (group.isClosedGroup) {
if (group.isClosedGroup && group.isActive) {
if (!group.members.contains(Address.fromSerialized(storage.getUserPublicKey()!!))) continue
val groupPublicKey = GroupUtil.doubleDecodeGroupID(group.encodedId).toHexString()
val encryptionKeyPair = storage.getLatestClosedGroupEncryptionKeyPair(groupPublicKey) ?: continue

View File

@@ -0,0 +1,36 @@
package org.session.libsession.messaging.messages.control
import com.google.protobuf.ByteString
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage
class SharedConfigurationMessage(val kind: SharedConfigMessage.Kind, val data: ByteArray, val seqNo: Long): ControlMessage() {
override val ttl: Long = 30 * 24 * 60 * 60 * 1000L
override val isSelfSendValid: Boolean = true
companion object {
fun fromProto(proto: SignalServiceProtos.Content): SharedConfigurationMessage? {
if (!proto.hasSharedConfigMessage()) return null
val sharedConfig = proto.sharedConfigMessage
if (!sharedConfig.hasKind() || !sharedConfig.hasData()) return null
return SharedConfigurationMessage(sharedConfig.kind, sharedConfig.data.toByteArray(), sharedConfig.seqno)
}
}
override fun isValid(): Boolean {
if (!super.isValid()) return false
return data.isNotEmpty() && seqNo >= 0
}
override fun toProto(): SignalServiceProtos.Content? {
val sharedConfigurationMessage = SharedConfigMessage.newBuilder()
.setKind(kind)
.setSeqno(seqNo)
.setData(ByteString.copyFrom(data))
.build()
return SignalServiceProtos.Content.newBuilder()
.setSharedConfigMessage(sharedConfigurationMessage)
.build()
}
}

View File

@@ -9,6 +9,7 @@ import org.session.libsession.messaging.messages.control.DataExtractionNotificat
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.MessageRequestResponse
import org.session.libsession.messaging.messages.control.ReadReceipt
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.messages.control.TypingIndicator
import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.VisibleMessage
@@ -34,13 +35,14 @@ object MessageReceiver {
object NoThread: Error("Couldn't find thread for message.")
object SelfSend: Error("Message addressed at self.")
object InvalidGroupPublicKey: Error("Invalid group public key.")
object NoGroupThread: Error("No thread exists for this group.")
object NoGroupKeyPair: Error("Missing group key pair.")
object NoUserED25519KeyPair : Error("Couldn't find user ED25519 key pair.")
internal val isRetryable: Boolean = when (this) {
is DuplicateMessage, is InvalidMessage, is UnknownMessage,
is UnknownEnvelopeType, is InvalidSignature, is NoData,
is SenderBlocked, is SelfSend -> false
is SenderBlocked, is SelfSend, is NoGroupThread -> false
else -> true
}
}
@@ -51,6 +53,7 @@ object MessageReceiver {
isOutgoing: Boolean? = null,
otherBlindedPublicKey: String? = null,
openGroupPublicKey: String? = null,
currentClosedGroups: Set<String>?
): Pair<Message, SignalServiceProtos.Content> {
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()
@@ -70,7 +73,7 @@ object MessageReceiver {
} else {
when (envelope.type) {
SignalServiceProtos.Envelope.Type.SESSION_MESSAGE -> {
if (IdPrefix.fromValue(envelope.source) == IdPrefix.BLINDED) {
if (IdPrefix.fromValue(envelope.source)?.isBlinded() == true) {
openGroupPublicKey ?: throw Error.InvalidGroupPublicKey
otherBlindedPublicKey ?: throw Error.DecryptionFailed
val decryptionResult = MessageDecrypter.decryptBlinded(
@@ -139,6 +142,7 @@ object MessageReceiver {
UnsendRequest.fromProto(proto) ?:
MessageRequestResponse.fromProto(proto) ?:
CallMessage.fromProto(proto) ?:
SharedConfigurationMessage.fromProto(proto) ?:
VisibleMessage.fromProto(proto) ?: run {
throw Error.UnknownMessage
}
@@ -147,6 +151,9 @@ object MessageReceiver {
if (!message.isSelfSendValid && (sender == userPublicKey || isUserBlindedSender)) {
throw Error.SelfSend
}
if (sender == userPublicKey || isUserBlindedSender) {
message.isSenderSelf = true
}
// Guard against control messages in open groups
if (isOpenGroupMessage && message !is VisibleMessage) {
throw Error.InvalidMessage
@@ -167,12 +174,16 @@ object MessageReceiver {
// If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp
// will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround
// for this issue.
if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) {
if (groupPublicKey != null && groupPublicKey !in (currentClosedGroups ?: emptySet())) {
throw Error.NoGroupThread
}
if ((message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) || message is SharedConfigurationMessage) {
// Allow duplicates in this case to avoid the following situation:
// • The app performed a background poll or received a push notification
// • This method was invoked and the received message timestamps table was updated
// • Processing wasn't finished
// • The user doesn't see the new closed group
// also allow shared configuration messages to be duplicates since we track hashes separately use seqno for conflict resolution
} else {
if (storage.isDuplicateMessage(envelope.timestamp)) { throw Error.DuplicateMessage }
storage.addReceivedMessageTimestamp(envelope.timestamp)

View File

@@ -13,6 +13,7 @@ import org.session.libsession.messaging.messages.control.ClosedGroupControlMessa
import org.session.libsession.messaging.messages.control.ConfigurationMessage
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.control.MessageRequestResponse
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.messages.control.UnsendRequest
import org.session.libsession.messaging.messages.visible.LinkPreview
import org.session.libsession.messaging.messages.visible.Quote
@@ -61,7 +62,7 @@ object MessageSender {
}
// Convenience
fun send(message: Message, destination: Destination, isSyncMessage: Boolean = false): Promise<Unit, Exception> {
fun send(message: Message, destination: Destination, isSyncMessage: Boolean): Promise<Unit, Exception> {
return if (destination is Destination.LegacyOpenGroup || destination is Destination.OpenGroup || destination is Destination.OpenGroupInbox) {
sendToOpenGroupDestination(destination, message)
} else {
@@ -69,71 +70,115 @@ object MessageSender {
}
}
// One-on-One Chats & Closed Groups
@Throws(Exception::class)
fun buildWrappedMessageToSnode(destination: Destination, message: Message, isSyncMessage: Boolean): SnodeMessage {
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()
// Set the timestamp, sender and recipient
val messageSendTime = SnodeAPI.nowWithOffset
if (message.sentTimestamp == null) {
message.sentTimestamp =
messageSendTime // Visible messages will already have their sent timestamp set
}
message.sender = userPublicKey
when (destination) {
is Destination.Contact -> message.recipient = destination.publicKey
is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey
else -> throw IllegalStateException("Destination should not be an open group.")
}
val isSelfSend = (message.recipient == userPublicKey)
// Validate the message
if (!message.isValid()) {
throw Error.InvalidMessage
}
// Stop here if this is a self-send, unless it's:
// • a configuration message
// • a sync message
// • a closed group control message of type `new`
var isNewClosedGroupControlMessage = false
if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) isNewClosedGroupControlMessage =
true
if (isSelfSend
&& message !is ConfigurationMessage
&& !isSyncMessage
&& !isNewClosedGroupControlMessage
&& message !is UnsendRequest
&& message !is SharedConfigurationMessage
) {
throw Error.InvalidMessage
}
// Attach the user's profile if needed
if (message is VisibleMessage) {
message.profile = storage.getUserProfile()
}
if (message is MessageRequestResponse) {
message.profile = storage.getUserProfile()
}
// Convert it to protobuf
val proto = message.toProto() ?: throw Error.ProtoConversionFailed
// Serialize the protobuf
val plaintext = PushTransportDetails.getPaddedMessageBody(proto.toByteArray())
// Encrypt the serialized protobuf
val ciphertext = when (destination) {
is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey)
is Destination.ClosedGroup -> {
val encryptionKeyPair =
MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(
destination.groupPublicKey
)!!
MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey)
}
else -> throw IllegalStateException("Destination should not be open group.")
}
// Wrap the result
val kind: SignalServiceProtos.Envelope.Type
val senderPublicKey: String
when (destination) {
is Destination.Contact -> {
kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
senderPublicKey = ""
}
is Destination.ClosedGroup -> {
kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE
senderPublicKey = destination.groupPublicKey
}
else -> throw IllegalStateException("Destination should not be open group.")
}
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
val base64EncodedData = Base64.encodeBytes(wrappedMessage)
// Send the result
return SnodeMessage(
message.recipient!!,
base64EncodedData,
message.ttl,
messageSendTime
)
}
// One-on-One Chats & Closed Groups
private fun sendToSnodeDestination(destination: Destination, message: Message, isSyncMessage: Boolean = false): Promise<Unit, Exception> {
val deferred = deferred<Unit, Exception>()
val promise = deferred.promise
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()
// Set the timestamp, sender and recipient
if (message.sentTimestamp == null) {
message.sentTimestamp = SnodeAPI.nowWithOffset // Visible messages will already have their sent timestamp set
}
val messageSendTime = SnodeAPI.nowWithOffset
// recipient will be set later, so initialize it as a function here
val isSelfSend = { message.recipient == userPublicKey }
message.sender = userPublicKey
val isSelfSend = (message.recipient == userPublicKey)
// Set the failure handler (need it here already for precondition failure handling)
fun handleFailure(error: Exception) {
handleFailedMessageSend(message, error, isSyncMessage)
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend()) {
SnodeModule.shared.broadcaster.broadcast("messageFailed", message.sentTimestamp!!)
}
deferred.reject(error)
}
try {
when (destination) {
is Destination.Contact -> message.recipient = destination.publicKey
is Destination.ClosedGroup -> message.recipient = destination.groupPublicKey
else -> throw IllegalStateException("Destination should not be an open group.")
}
// Validate the message
if (!message.isValid()) { throw Error.InvalidMessage }
// Stop here if this is a self-send, unless it's:
// • a configuration message
// • a sync message
// • a closed group control message of type `new`
var isNewClosedGroupControlMessage = false
if (message is ClosedGroupControlMessage && message.kind is ClosedGroupControlMessage.Kind.New) isNewClosedGroupControlMessage = true
if (isSelfSend && message !is ConfigurationMessage && !isSyncMessage && !isNewClosedGroupControlMessage && message !is UnsendRequest) {
handleSuccessfulMessageSend(message, destination)
deferred.resolve(Unit)
return promise
}
// Attach the user's profile if needed
if (message is VisibleMessage) {
message.profile = storage.getUserProfile()
}
if (message is MessageRequestResponse) {
message.profile = storage.getUserProfile()
}
// Convert it to protobuf
val proto = message.toProto() ?: throw Error.ProtoConversionFailed
// Serialize the protobuf
val plaintext = PushTransportDetails.getPaddedMessageBody(proto.toByteArray())
// Encrypt the serialized protobuf
val ciphertext = when (destination) {
is Destination.Contact -> MessageEncrypter.encrypt(plaintext, destination.publicKey)
is Destination.ClosedGroup -> {
val encryptionKeyPair = MessagingModuleConfiguration.shared.storage.getLatestClosedGroupEncryptionKeyPair(destination.groupPublicKey)!!
MessageEncrypter.encrypt(plaintext, encryptionKeyPair.hexEncodedPublicKey)
}
else -> throw IllegalStateException("Destination should not be open group.")
}
// Wrap the result
val kind: SignalServiceProtos.Envelope.Type
val senderPublicKey: String
val snodeMessage = buildWrappedMessageToSnode(destination, message, isSyncMessage)
// TODO: this might change in future for config messages
val forkInfo = SnodeAPI.forkInfo
val namespaces: List<Int> = when {
@@ -143,29 +188,6 @@ object MessageSender {
&& forkInfo.hasNamespaces() -> listOf(Namespace.UNAUTHENTICATED_CLOSED_GROUP, Namespace.DEFAULT)
else -> listOf(Namespace.DEFAULT)
}
when (destination) {
is Destination.Contact -> {
kind = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE
senderPublicKey = ""
}
is Destination.ClosedGroup -> {
kind = SignalServiceProtos.Envelope.Type.CLOSED_GROUP_MESSAGE
senderPublicKey = destination.groupPublicKey
}
else -> throw IllegalStateException("Destination should not be open group.")
}
val wrappedMessage = MessageWrapper.wrap(kind, message.sentTimestamp!!, senderPublicKey, ciphertext)
// Send the result
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("calculatingPoW", messageSendTime)
}
val base64EncodedData = Base64.encodeBytes(wrappedMessage)
// Send the result
val timestamp = messageSendTime + SnodeAPI.clockOffset
val snodeMessage = SnodeMessage(message.recipient!!, base64EncodedData, message.ttl, timestamp)
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("sendingMessage", messageSendTime)
}
namespaces.map { namespace -> SnodeAPI.sendMessage(snodeMessage, requiresAuth = false, namespace = namespace) }.let { promises ->
var isSuccess = false
val promiseCount = promises.size
@@ -174,9 +196,6 @@ object MessageSender {
promise.success {
if (isSuccess) { return@success } // Succeed as soon as the first promise succeeds
isSuccess = true
if (destination is Destination.Contact && message is VisibleMessage && !isSelfSend) {
SnodeModule.shared.broadcaster.broadcast("messageSent", messageSendTime)
}
val hash = it["hash"] as? String
message.serverHash = hash
handleSuccessfulMessageSend(message, destination, isSyncMessage)
@@ -414,24 +433,24 @@ object MessageSender {
@JvmStatic
fun send(message: Message, address: Address) {
val threadID = MessagingModuleConfiguration.shared.storage.getOrCreateThreadIdFor(address)
val threadID = MessagingModuleConfiguration.shared.storage.getThreadId(address)
message.threadID = threadID
val destination = Destination.from(address)
val job = MessageSendJob(message, destination)
JobQueue.shared.add(job)
}
fun sendNonDurably(message: VisibleMessage, attachments: List<SignalAttachment>, address: Address): Promise<Unit, Exception> {
fun sendNonDurably(message: VisibleMessage, attachments: List<SignalAttachment>, address: Address, isSyncMessage: Boolean): Promise<Unit, Exception> {
val attachmentIDs = MessagingModuleConfiguration.shared.messageDataProvider.getAttachmentIDsFor(message.id!!)
message.attachmentIDs.addAll(attachmentIDs)
return sendNonDurably(message, address)
return sendNonDurably(message, address, isSyncMessage)
}
fun sendNonDurably(message: Message, address: Address): Promise<Unit, Exception> {
val threadID = MessagingModuleConfiguration.shared.storage.getOrCreateThreadIdFor(address)
fun sendNonDurably(message: Message, address: Address, isSyncMessage: Boolean): Promise<Unit, Exception> {
val threadID = MessagingModuleConfiguration.shared.storage.getThreadId(address)
message.threadID = threadID
val destination = Destination.from(address)
return send(message, destination)
return send(message, destination, isSyncMessage)
}
// Closed groups

View File

@@ -18,14 +18,14 @@ import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.recipients.Recipient
import org.session.libsignal.crypto.ecc.Curve
import org.session.libsignal.crypto.ecc.ECKeyPair
import org.session.libsignal.utilities.guava.Optional
import org.session.libsignal.messages.SignalServiceGroup
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.guava.Optional
import org.session.libsignal.utilities.hexEncodedPublicKey
import org.session.libsignal.utilities.removingIdPrefixIfNeeded
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.Log
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@@ -60,16 +60,20 @@ fun MessageSender.create(name: String, members: Collection<String>): Promise<Str
// Add the group to the user's set of public keys to poll for
storage.addClosedGroupPublicKey(groupPublicKey)
// Store the encryption key pair
storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey)
storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey, sentTime)
// Create the thread
storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
// Notify the user
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceGroup.Type.CREATION, name, members, admins, threadID, sentTime)
val ourPubKey = storage.getUserPublicKey()
for (member in members) {
val closedGroupControlMessage = ClosedGroupControlMessage(closedGroupUpdateKind)
closedGroupControlMessage.sentTimestamp = sentTime
try {
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(member)).get()
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(member), member == ourPubKey).get()
} catch (e: Exception) {
// We failed to properly create the group so delete it's associated data (in the past
// we didn't create this data until the messages successfully sent but this resulted
@@ -82,6 +86,8 @@ fun MessageSender.create(name: String, members: Collection<String>): Promise<Str
}
}
// Add the group to the config now that it was successfully created
storage.createInitialConfigGroup(groupPublicKey, name, GroupUtil.createConfigMemberMap(members, admins), sentTime, encryptionKeyPair)
// Notify the PN server
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, userPublicKey)
// Start polling
@@ -93,24 +99,6 @@ fun MessageSender.create(name: String, members: Collection<String>): Promise<Str
return deferred.promise
}
fun MessageSender.update(groupPublicKey: String, members: List<String>, name: String) {
val context = MessagingModuleConfiguration.shared.context
val storage = MessagingModuleConfiguration.shared.storage
val groupID = GroupUtil.doubleEncodeGroupID(groupPublicKey)
val group = storage.getGroup(groupID) ?: run {
Log.d("Loki", "Can't update nonexistent closed group.")
throw Error.NoThread
}
// Update name if needed
if (name != group.title) { setName(groupPublicKey, name) }
// Add members if needed
val addedMembers = members - group.members.map { it.serialize() }
if (!addedMembers.isEmpty()) { addMembers(groupPublicKey, addedMembers) }
// Remove members if needed
val removedMembers = group.members.map { it.serialize() } - members
if (removedMembers.isEmpty()) { removeMembers(groupPublicKey, removedMembers) }
}
fun MessageSender.setName(groupPublicKey: String, newName: String) {
val context = MessagingModuleConfiguration.shared.context
val storage = MessagingModuleConfiguration.shared.storage
@@ -252,15 +240,15 @@ fun MessageSender.leave(groupPublicKey: String, notifyUser: Boolean = true): Pro
val sentTime = SnodeAPI.nowWithOffset
closedGroupControlMessage.sentTimestamp = sentTime
storage.setActive(groupID, false)
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(groupID)).success {
sendNonDurably(closedGroupControlMessage, Address.fromSerialized(groupID), isSyncMessage = false).success {
// Notify the user
val infoType = SignalServiceGroup.Type.QUIT
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
if (notifyUser) {
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, infoType, name, updatedMembers, admins, threadID, sentTime)
}
// Remove the group private key and unsubscribe from PNs
MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey)
MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey, true)
deferred.resolve(Unit)
}.fail {
storage.setActive(groupID, true)
@@ -292,7 +280,7 @@ fun MessageSender.generateAndSendNewEncryptionKeyPair(groupPublicKey: String, ta
// Distribute it
sendEncryptionKeyPair(groupPublicKey, newKeyPair, targetMembers)?.success {
// Store it * after * having sent out the message to the group
storage.addClosedGroupEncryptionKeyPair(newKeyPair, groupPublicKey)
storage.addClosedGroupEncryptionKeyPair(newKeyPair, groupPublicKey, SnodeAPI.nowWithOffset)
pendingKeyPairs[groupPublicKey] = Optional.absent()
}
}
@@ -312,7 +300,8 @@ fun MessageSender.sendEncryptionKeyPair(groupPublicKey: String, newKeyPair: ECKe
val closedGroupControlMessage = ClosedGroupControlMessage(kind)
closedGroupControlMessage.sentTimestamp = sentTime
return if (force) {
MessageSender.sendNonDurably(closedGroupControlMessage, Address.fromSerialized(destination))
val isSync = MessagingModuleConfiguration.shared.storage.getUserPublicKey() == destination
MessageSender.sendNonDurably(closedGroupControlMessage, Address.fromSerialized(destination), isSyncMessage = isSync)
} else {
MessageSender.send(closedGroupControlMessage, Address.fromSerialized(destination))
null

View File

@@ -1,11 +1,11 @@
package org.session.libsession.messaging.sending_receiving
import android.text.TextUtils
import network.loki.messenger.libsession_util.ConfigBase
import org.session.libsession.avatars.AvatarHelper
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BackgroundGroupAddJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.RetrieveProfileAvatarJob
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.CallMessage
import org.session.libsession.messaging.messages.control.ClosedGroupControlMessage
@@ -42,6 +42,7 @@ import org.session.libsignal.crypto.ecc.DjbECPublicKey
import org.session.libsignal.crypto.ecc.ECKeyPair
import org.session.libsignal.messages.SignalServiceGroup
import org.session.libsignal.protos.SignalServiceProtos
import org.session.libsignal.protos.SignalServiceProtos.SharedConfigMessage
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Log
@@ -58,7 +59,10 @@ internal fun MessageReceiver.isBlocked(publicKey: String): Boolean {
return recipient.isBlocked
}
fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content, openGroupID: String?) {
fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content, threadId: Long, openGroupID: String?) {
// Do nothing if the message was outdated
if (MessageReceiver.messageIsOutdated(message, threadId, openGroupID)) { return }
when (message) {
is ReadReceipt -> handleReadReceipt(message)
is TypingIndicator -> handleTypingIndicator(message)
@@ -68,8 +72,8 @@ fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content,
is ConfigurationMessage -> handleConfigurationMessage(message)
is UnsendRequest -> handleUnsendRequest(message)
is MessageRequestResponse -> handleMessageRequestResponse(message)
is VisibleMessage -> handleVisibleMessage(message, proto, openGroupID,
runIncrement = true,
is VisibleMessage -> handleVisibleMessage(
message, proto, openGroupID, threadId,
runThreadUpdate = true,
runProfileUpdate = true
)
@@ -77,6 +81,33 @@ fun MessageReceiver.handle(message: Message, proto: SignalServiceProtos.Content,
}
}
fun MessageReceiver.messageIsOutdated(message: Message, threadId: Long, openGroupID: String?): Boolean {
when (message) {
is ReadReceipt -> return false // No visible artifact created so better to keep for more reliable read states
is UnsendRequest -> return false // We should always process the removal of messages just in case
}
// Determine the state of the conversation and the validity of the message
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = storage.getUserPublicKey()!!
val threadRecipient = storage.getRecipientForThread(threadId)
val conversationVisibleInConfig = storage.conversationInConfig(
if (message.groupPublicKey == null) threadRecipient?.address?.serialize() else null,
message.groupPublicKey,
openGroupID,
true
)
val canPerformChange = storage.canPerformConfigChange(
if (threadRecipient?.address?.serialize() == userPublicKey) SharedConfigMessage.Kind.USER_PROFILE.name else SharedConfigMessage.Kind.CONTACTS.name,
userPublicKey,
message.sentTimestamp!!
)
// If the thread is visible or the message was sent more recently than the last config message (minus
// buffer period) then we should process the message, if not then the message is outdated
return (!conversationVisibleInConfig && !canPerformChange)
}
// region Control Messages
private fun MessageReceiver.handleReadReceipt(message: ReadReceipt) {
val context = MessagingModuleConfiguration.shared.context
@@ -129,6 +160,7 @@ private fun MessageReceiver.handleDataExtractionNotification(message: DataExtrac
if (message.groupPublicKey != null) return
val storage = MessagingModuleConfiguration.shared.storage
val senderPublicKey = message.sender!!
val notification: DataExtractionNotificationInfoMessage = when(message.kind) {
is DataExtractionNotification.Kind.Screenshot -> DataExtractionNotificationInfoMessage(DataExtractionNotificationInfoMessage.Kind.SCREENSHOT)
is DataExtractionNotification.Kind.MediaSaved -> DataExtractionNotificationInfoMessage(DataExtractionNotificationInfoMessage.Kind.MEDIA_SAVED)
@@ -149,11 +181,17 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
TextSecurePreferences.setConfigurationMessageSynced(context, true)
TextSecurePreferences.setLastProfileUpdateTime(context, message.sentTimestamp!!)
val isForceSync = TextSecurePreferences.hasForcedNewConfig(context)
val currentTime = SnodeAPI.nowWithOffset
if (ConfigBase.isNewConfigEnabled(isForceSync, currentTime)) {
TextSecurePreferences.setHasLegacyConfig(context, true)
if (!firstTimeSync) return
}
val allClosedGroupPublicKeys = storage.getAllClosedGroupPublicKeys()
for (closedGroup in message.closedGroups) {
if (allClosedGroupPublicKeys.contains(closedGroup.publicKey)) {
// just handle the closed group encryption key pairs to avoid sync'd devices getting out of sync
storage.addClosedGroupEncryptionKeyPair(closedGroup.encryptionKeyPair!!, closedGroup.publicKey)
storage.addClosedGroupEncryptionKeyPair(closedGroup.encryptionKeyPair!!, closedGroup.publicKey, message.sentTimestamp!!)
} else if (firstTimeSync) {
// only handle new closed group if it's first time sync
handleNewClosedGroup(message.sender!!, message.sentTimestamp!!, closedGroup.publicKey, closedGroup.name,
@@ -166,9 +204,9 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
.replace(OpenGroupApi.httpDefaultServer, OpenGroupApi.defaultServer)
}) {
if (allV2OpenGroups.contains(openGroup)) continue
Log.d("OpenGroup", "All open groups doesn't contain $openGroup")
Log.d("OpenGroup", "All open groups doesn't contain open group")
if (!storage.hasBackgroundGroupAddJob(openGroup)) {
Log.d("OpenGroup", "Doesn't contain background job for $openGroup, adding")
Log.d("OpenGroup", "Doesn't contain background job for open group, adding")
JobQueue.shared.add(BackgroundGroupAddJob(openGroup))
}
}
@@ -182,10 +220,7 @@ private fun handleConfigurationMessage(message: ConfigurationMessage) {
&& TextSecurePreferences.getProfilePictureURL(context) != message.profilePicture) {
val profileKey = Base64.encodeBytes(message.profileKey)
ProfileKeyUtil.setEncodedProfileKey(context, profileKey)
profileManager.setProfileKey(context, recipient, message.profileKey)
if (!message.profilePicture.isNullOrEmpty() && TextSecurePreferences.getProfilePictureURL(context) != message.profilePicture) {
JobQueue.shared.add(RetrieveProfileAvatarJob(message.profilePicture!!, recipient.address))
}
profileManager.setProfilePicture(context, recipient, message.profilePicture, message.profileKey)
}
storage.addContacts(message.contacts)
}
@@ -215,24 +250,28 @@ fun handleMessageRequestResponse(message: MessageRequestResponse) {
}
//endregion
fun MessageReceiver.handleVisibleMessage(message: VisibleMessage,
proto: SignalServiceProtos.Content,
openGroupID: String?,
runIncrement: Boolean,
runThreadUpdate: Boolean,
runProfileUpdate: Boolean): Long? {
fun MessageReceiver.handleVisibleMessage(
message: VisibleMessage,
proto: SignalServiceProtos.Content,
openGroupID: String?,
threadId: Long,
runThreadUpdate: Boolean,
runProfileUpdate: Boolean
): Long? {
val storage = MessagingModuleConfiguration.shared.storage
val context = MessagingModuleConfiguration.shared.context
val userPublicKey = storage.getUserPublicKey()
val messageSender: String? = message.sender
// Do nothing if the message was outdated
if (MessageReceiver.messageIsOutdated(message, threadId, openGroupID)) { return null }
// Get or create thread
// FIXME: In case this is an open group this actually * doesn't * create the thread if it doesn't yet
// exist. This is intentional, but it's very non-obvious.
val threadID = storage.getOrCreateThreadIdFor(message.syncTarget ?: messageSender!!, message.groupPublicKey, openGroupID)
if (threadID < 0) {
val threadID = storage.getThreadIdFor(message.syncTarget ?: messageSender!!, message.groupPublicKey, openGroupID, createThread = true)
// Thread doesn't exist; should only be reached in a case where we are processing open group messages for a no longer existent thread
throw MessageReceiver.Error.NoThread
}
?: throw MessageReceiver.Error.NoThread
val threadRecipient = storage.getRecipientForThread(threadID)
val userBlindedKey = openGroupID?.let {
val openGroup = storage.getOpenGroup(threadID) ?: return@let null
@@ -259,9 +298,10 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage,
val profileKeyChanged = (recipient.profileKey == null || !MessageDigest.isEqual(recipient.profileKey, newProfileKey))
if ((profileKeyValid && profileKeyChanged) || (profileKeyValid && needsProfilePicture)) {
profileManager.setProfileKey(context, recipient, newProfileKey!!)
profileManager.setProfilePicture(context, recipient, profile.profilePictureURL, newProfileKey)
profileManager.setUnidentifiedAccessMode(context, recipient, Recipient.UnidentifiedAccessMode.UNKNOWN)
profileManager.setProfilePictureURL(context, recipient, profile.profilePictureURL!!)
} else if (newProfileKey == null || newProfileKey.isEmpty() || profile.profilePictureURL.isNullOrEmpty()) {
profileManager.setProfilePicture(context, recipient, null, null)
}
}
}
@@ -344,7 +384,7 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage,
message.threadID = threadID
val messageID =
storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID,
attachments, runIncrement, runThreadUpdate
attachments, runThreadUpdate
) ?: return null
val openGroupServerID = message.openGroupServerMessageID
if (openGroupServerID != null) {
@@ -437,12 +477,34 @@ private fun MessageReceiver.handleClosedGroupControlMessage(message: ClosedGroup
is ClosedGroupControlMessage.Kind.MembersRemoved -> handleClosedGroupMembersRemoved(message)
is ClosedGroupControlMessage.Kind.MemberLeft -> handleClosedGroupMemberLeft(message)
}
if (
message.kind !is ClosedGroupControlMessage.Kind.New &&
MessagingModuleConfiguration.shared.storage.canPerformConfigChange(
SharedConfigMessage.Kind.GROUPS.name,
MessagingModuleConfiguration.shared.storage.getUserPublicKey()!!,
message.sentTimestamp!!
)
) {
// update the config
val closedGroupPublicKey = message.getPublicKey()
val storage = MessagingModuleConfiguration.shared.storage
storage.updateGroupConfig(closedGroupPublicKey)
}
}
private fun ClosedGroupControlMessage.getPublicKey(): String = kind!!.let { when (it) {
is ClosedGroupControlMessage.Kind.New -> it.publicKey.toByteArray().toHexString()
is ClosedGroupControlMessage.Kind.EncryptionKeyPair -> it.publicKey?.toByteArray()?.toHexString() ?: groupPublicKey!!
is ClosedGroupControlMessage.Kind.MemberLeft -> groupPublicKey!!
is ClosedGroupControlMessage.Kind.MembersAdded -> groupPublicKey!!
is ClosedGroupControlMessage.Kind.MembersRemoved -> groupPublicKey!!
is ClosedGroupControlMessage.Kind.NameChange -> groupPublicKey!!
}}
private fun MessageReceiver.handleNewClosedGroup(message: ClosedGroupControlMessage) {
val kind = message.kind!! as? ClosedGroupControlMessage.Kind.New ?: return
val recipient = Recipient.from(MessagingModuleConfiguration.shared.context, Address.fromSerialized(message.sender!!), false)
if (!recipient.isApproved && !recipient.isLocalNumber) return
if (!recipient.isApproved && !recipient.isLocalNumber) return Log.e("Loki", "not accepting new closed group from unapproved recipient")
val groupPublicKey = kind.publicKey.toByteArray().toHexString()
val members = kind.members.map { it.toByteArray().toHexString() }
val admins = kind.admins.map { it.toByteArray().toHexString() }
@@ -453,10 +515,24 @@ private fun MessageReceiver.handleNewClosedGroup(message: ClosedGroupControlMess
private fun handleNewClosedGroup(sender: String, sentTimestamp: Long, groupPublicKey: String, name: String, encryptionKeyPair: ECKeyPair, members: List<String>, admins: List<String>, formationTimestamp: Long, expireTimer: Int) {
val context = MessagingModuleConfiguration.shared.context
val storage = MessagingModuleConfiguration.shared.storage
val userPublicKey = TextSecurePreferences.getLocalNumber(context)
// Create the group
val userPublicKey = storage.getUserPublicKey()!!
val groupID = GroupUtil.doubleEncodeGroupID(groupPublicKey)
val groupExists = storage.getGroup(groupID) != null
if (!storage.canPerformConfigChange(SharedConfigMessage.Kind.GROUPS.name, userPublicKey, sentTimestamp)) {
// If the closed group already exists then store the encryption keys (since the config only stores
// the latest key we won't be able to decrypt older messages if we were added to the group within
// the last two weeks and the key has been rotated - unfortunately if the user was added more than
// two weeks ago and the keys were rotated within the last two weeks then we won't be able to decrypt
// messages received before the key rotation)
if (groupExists) {
storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey, sentTimestamp)
storage.updateGroupConfig(groupPublicKey)
}
return
}
// Create the group
if (groupExists) {
// Update the group
if (!storage.isGroupActive(groupPublicKey)) {
@@ -475,18 +551,15 @@ private fun handleNewClosedGroup(sender: String, sentTimestamp: Long, groupPubli
// Add the group to the user's set of public keys to poll for
storage.addClosedGroupPublicKey(groupPublicKey)
// Store the encryption key pair
storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey)
storage.addClosedGroupEncryptionKeyPair(encryptionKeyPair, groupPublicKey, sentTimestamp)
storage.createInitialConfigGroup(groupPublicKey, name, GroupUtil.createConfigMemberMap(members, admins), formationTimestamp, encryptionKeyPair)
// Set expiration timer
storage.setExpirationTimer(groupID, expireTimer)
// Notify the PN server
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, storage.getUserPublicKey()!!)
// Notify the user
if (userPublicKey == sender && !groupExists) {
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceGroup.Type.CREATION, name, members, admins, threadID, sentTimestamp)
} else if (userPublicKey != sender) {
storage.insertIncomingInfoMessage(context, sender, groupID, SignalServiceGroup.Type.CREATION, name, members, admins, sentTimestamp)
}
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Subscribe, groupPublicKey, userPublicKey)
// Create thread
val threadId = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.setThreadDate(threadId, formationTimestamp)
// Start polling
ClosedGroupPollerV2.shared.startPolling(groupPublicKey)
}
@@ -527,7 +600,7 @@ private fun MessageReceiver.handleClosedGroupEncryptionKeyPair(message: ClosedGr
Log.d("Loki", "Ignoring duplicate closed group encryption key pair.")
return
}
storage.addClosedGroupEncryptionKeyPair(keyPair, groupPublicKey)
storage.addClosedGroupEncryptionKeyPair(keyPair, groupPublicKey, message.sentTimestamp!!)
Log.d("Loki", "Received a new closed group encryption key pair.")
}
@@ -555,7 +628,12 @@ private fun MessageReceiver.handleClosedGroupNameChanged(message: ClosedGroupCon
val members = group.members.map { it.serialize() }
val admins = group.admins.map { it.serialize() }
val name = kind.name
storage.updateTitle(groupID, name)
// Only update the group in storage if it isn't invalidated by the config state
if (storage.canPerformConfigChange(SharedConfigMessage.Kind.GROUPS.name, userPublicKey!!, message.sentTimestamp!!)) {
storage.updateTitle(groupID, name)
}
// Notify the user
if (userPublicKey == senderPublicKey) {
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
@@ -589,12 +667,16 @@ private fun MessageReceiver.handleClosedGroupMembersAdded(message: ClosedGroupCo
val updateMembers = kind.members.map { it.toByteArray().toHexString() }
val newMembers = members + updateMembers
storage.updateMembers(groupID, newMembers.map { Address.fromSerialized(it) })
// Update zombie members in case the added members are zombies
val zombies = storage.getZombieMembers(groupID)
if (zombies.intersect(updateMembers).isNotEmpty()) {
storage.setZombieMembers(groupID, zombies.minus(updateMembers).map { Address.fromSerialized(it) })
// Only update the group in storage if it isn't invalidated by the config state
if (storage.canPerformConfigChange(SharedConfigMessage.Kind.GROUPS.name, userPublicKey, message.sentTimestamp!!)) {
storage.updateMembers(groupID, newMembers.map { Address.fromSerialized(it) })
// Update zombie members in case the added members are zombies
val zombies = storage.getZombieMembers(groupID)
if (zombies.intersect(updateMembers).isNotEmpty()) {
storage.setZombieMembers(groupID, zombies.minus(updateMembers).map { Address.fromSerialized(it) })
}
}
// Notify the user
@@ -676,13 +758,18 @@ private fun MessageReceiver.handleClosedGroupMembersRemoved(message: ClosedGroup
Log.d("Loki", "Received a MEMBERS_REMOVED instead of a MEMBERS_LEFT from sender: $senderPublicKey.")
}
val wasCurrentUserRemoved = userPublicKey in removedMembers
// Admin should send a MEMBERS_LEFT message but handled here just in case
if (didAdminLeave || wasCurrentUserRemoved) {
disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey)
} else {
storage.updateMembers(groupID, newMembers.map { Address.fromSerialized(it) })
// Update zombie members
storage.setZombieMembers(groupID, zombies.minus(removedMembers).map { Address.fromSerialized(it) })
// Only update the group in storage if it isn't invalidated by the config state
if (storage.canPerformConfigChange(SharedConfigMessage.Kind.GROUPS.name, userPublicKey, message.sentTimestamp!!)) {
// Admin should send a MEMBERS_LEFT message but handled here just in case
if (didAdminLeave || wasCurrentUserRemoved) {
disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey, true)
return
} else {
storage.updateMembers(groupID, newMembers.map { Address.fromSerialized(it) })
// Update zombie members
storage.setZombieMembers(groupID, zombies.minus(removedMembers).map { Address.fromSerialized(it) })
}
}
// Notify the user
@@ -731,24 +818,30 @@ private fun MessageReceiver.handleClosedGroupMemberLeft(message: ClosedGroupCont
val didAdminLeave = admins.contains(senderPublicKey)
val updatedMemberList = members - senderPublicKey
val userLeft = (userPublicKey == senderPublicKey)
if (didAdminLeave || userLeft) {
disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey)
} else {
storage.updateMembers(groupID, updatedMemberList.map { Address.fromSerialized(it) })
// Update zombie members
val zombies = storage.getZombieMembers(groupID)
storage.setZombieMembers(groupID, zombies.plus(senderPublicKey).map { Address.fromSerialized(it) })
// Only update the group in storage if it isn't invalidated by the config state
if (storage.canPerformConfigChange(SharedConfigMessage.Kind.GROUPS.name, userPublicKey, message.sentTimestamp!!)) {
if (didAdminLeave || userLeft) {
disableLocalGroupAndUnsubscribe(groupPublicKey, groupID, userPublicKey, delete = userLeft)
if (userLeft) {
return
}
} else {
storage.updateMembers(groupID, updatedMemberList.map { Address.fromSerialized(it) })
// Update zombie members
val zombies = storage.getZombieMembers(groupID)
storage.setZombieMembers(groupID, zombies.plus(senderPublicKey).map { Address.fromSerialized(it) })
}
}
// Notify the user
if (userLeft) {
val threadID = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.insertOutgoingInfoMessage(context, groupID, SignalServiceGroup.Type.QUIT, name, members, admins, threadID, message.sentTimestamp!!)
} else {
if (!userLeft) {
storage.insertIncomingInfoMessage(context, senderPublicKey, groupID, SignalServiceGroup.Type.QUIT, name, members, admins, message.sentTimestamp!!)
}
}
private fun isValidGroupUpdate(group: GroupRecord, sentTimestamp: Long, senderPublicKey: String): Boolean {
private fun isValidGroupUpdate(group: GroupRecord, sentTimestamp: Long, senderPublicKey: String): Boolean {
val oldMembers = group.members.map { it.serialize() }
// Check that the message isn't from before the group was created
if (group.formationTimestamp > sentTimestamp) {
@@ -763,7 +856,7 @@ private fun isValidGroupUpdate(group: GroupRecord, sentTimestamp: Long, senderPu
return true
}
fun MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey: String, groupID: String, userPublicKey: String) {
fun MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey: String, groupID: String, userPublicKey: String, delete: Boolean) {
val storage = MessagingModuleConfiguration.shared.storage
storage.removeClosedGroupPublicKey(groupPublicKey)
// Remove the key pairs
@@ -775,5 +868,11 @@ fun MessageReceiver.disableLocalGroupAndUnsubscribe(groupPublicKey: String, grou
PushNotificationAPI.performOperation(PushNotificationAPI.ClosedGroupOperation.Unsubscribe, groupPublicKey, userPublicKey)
// Stop polling
ClosedGroupPollerV2.shared.stopPolling(groupPublicKey)
if (delete) {
val threadId = storage.getOrCreateThreadIdFor(Address.fromSerialized(groupID))
storage.cancelPendingMessageSendJobs(threadId)
storage.deleteConversation(threadId)
}
}
// endregion

View File

@@ -12,6 +12,7 @@ import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.jobs.OpenGroupDeleteJob
import org.session.libsession.messaging.jobs.TrimThreadJob
import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.open_groups.Endpoint
@@ -169,6 +170,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S
is Endpoint.Outbox, is Endpoint.OutboxSince -> {
handleDirectMessages(server, true, response.body as List<OpenGroupApi.DirectMessage>)
}
else -> { /* We don't care about the result of any other calls (won't be polled for) */}
}
if (secondToLastJob == null && !isCaughtUp) {
isCaughtUp = true
@@ -205,7 +207,7 @@ class OpenGroupPoller(private val server: String, private val executorService: S
val storage = MessagingModuleConfiguration.shared.storage
storage.setServerCapabilities(server, capabilities.capabilities)
}
private fun handleMessages(
server: String,
roomToken: String,
@@ -260,7 +262,8 @@ class OpenGroupPoller(private val server: String, private val executorService: S
null,
fromOutbox,
if (fromOutbox) it.recipient else it.sender,
serverPublicKey
serverPublicKey,
emptySet() // this shouldn't be necessary as we are polling open groups here
)
if (fromOutbox) {
val mapping = mappingCache[it.recipient] ?: storage.getOrCreateBlindedIdMapping(
@@ -277,7 +280,8 @@ class OpenGroupPoller(private val server: String, private val executorService: S
}
mappingCache[it.recipient] = mapping
}
MessageReceiver.handle(message, proto, null)
val threadId = Message.getThreadId(message, null, MessagingModuleConfiguration.shared.storage, false)
MessageReceiver.handle(message, proto, threadId ?: -1, null)
} catch (e: Exception) {
Log.e("Loki", "Couldn't handle direct message", e)
}

View File

@@ -1,5 +1,14 @@
package org.session.libsession.messaging.sending_receiving.pollers
import android.util.SparseArray
import androidx.core.util.valueIterator
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.Contacts
import network.loki.messenger.libsession_util.ConversationVolatileConfig
import network.loki.messenger.libsession_util.UserGroupsConfig
import network.loki.messenger.libsession_util.UserProfile
import nl.komponents.kovenant.Deferred
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.deferred
@@ -10,17 +19,23 @@ import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.jobs.BatchMessageReceiveJob
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveParameters
import org.session.libsession.messaging.messages.control.SharedConfigurationMessage
import org.session.libsession.messaging.sending_receiving.MessageReceiver
import org.session.libsession.snode.RawResponse
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.snode.SnodeModule
import org.session.libsession.utilities.ConfigFactoryProtocol
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import java.security.SecureRandom
import java.util.Timer
import java.util.TimerTask
import kotlin.time.Duration.Companion.days
private class PromiseCanceledException : Exception("Promise canceled.")
class Poller {
class Poller(private val configFactory: ConfigFactoryProtocol, debounceTimer: Timer) {
var userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: ""
private var hasStarted: Boolean = false
private val usedSnodes: MutableSet<Snode> = mutableSetOf()
@@ -97,23 +112,159 @@ class Poller {
}
}
private fun processPersonalMessages(snode: Snode, rawMessages: RawResponse) {
val messages = SnodeAPI.parseRawMessagesResponse(rawMessages, snode, userPublicKey)
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash)
}
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
val job = BatchMessageReceiveJob(chunk)
JobQueue.shared.add(job)
}
}
private fun processConfig(snode: Snode, rawMessages: RawResponse, namespace: Int, forConfigObject: ConfigBase?) {
if (forConfigObject == null) return
val messages = SnodeAPI.parseRawMessagesResponse(
rawMessages,
snode,
userPublicKey,
namespace,
updateLatestHash = true,
updateStoredHashes = true,
)
if (messages.isEmpty()) {
// no new messages to process
return
}
var latestMessageTimestamp: Long? = null
messages.forEach { (envelope, hash) ->
try {
val (message, _) = MessageReceiver.parse(data = envelope.toByteArray(),
// assume no groups in personal poller messages
openGroupServerID = null, currentClosedGroups = emptySet()
)
// sanity checks
if (message !is SharedConfigurationMessage) {
Log.w("Loki", "shared config message handled in configs wasn't SharedConfigurationMessage but was ${message.javaClass.simpleName}")
return@forEach
}
forConfigObject.merge(hash!! to message.data)
latestMessageTimestamp = if ((message.sentTimestamp ?: 0L) > (latestMessageTimestamp ?: 0L)) { message.sentTimestamp } else { latestMessageTimestamp }
} catch (e: Exception) {
Log.e("Loki", e)
}
}
// process new results
if (forConfigObject.needsDump()) {
configFactory.persist(forConfigObject, latestMessageTimestamp ?: SnodeAPI.nowWithOffset)
}
}
private fun poll(snode: Snode, deferred: Deferred<Unit, Exception>): Promise<Unit, Exception> {
if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) }
return SnodeAPI.getRawMessages(snode, userPublicKey).bind { rawResponse ->
isCaughtUp = true
if (deferred.promise.isDone()) {
task { Unit } // The long polling connection has been canceled; don't recurse
} else {
val messages = SnodeAPI.parseRawMessagesResponse(rawResponse, snode, userPublicKey)
val parameters = messages.map { (envelope, serverHash) ->
MessageReceiveParameters(envelope.toByteArray(), serverHash = serverHash)
return task {
runBlocking(Dispatchers.IO) {
val requestSparseArray = SparseArray<SnodeAPI.SnodeBatchRequestInfo>()
// get messages
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(snode, userPublicKey, maxSize = -2)!!.also { personalMessages ->
// namespaces here should always be set
requestSparseArray[personalMessages.namespace!!] = personalMessages
}
parameters.chunked(BatchMessageReceiveJob.BATCH_DEFAULT_NUMBER).forEach { chunk ->
val job = BatchMessageReceiveJob(chunk)
JobQueue.shared.add(job)
// get the latest convo info volatile
val hashesToExtend = mutableSetOf<String>()
configFactory.getUserConfigs().mapNotNull { config ->
hashesToExtend += config.currentHashes()
SnodeAPI.buildAuthenticatedRetrieveBatchRequest(
snode, userPublicKey,
config.configNamespace(),
maxSize = -8
)
}.forEach { request ->
// namespaces here should always be set
requestSparseArray[request.namespace!!] = request
}
poll(snode, deferred)
val requests =
requestSparseArray.valueIterator().asSequence().toMutableList()
if (hashesToExtend.isNotEmpty()) {
SnodeAPI.buildAuthenticatedAlterTtlBatchRequest(
messageHashes = hashesToExtend.toList(),
publicKey = userPublicKey,
newExpiry = SnodeAPI.nowWithOffset + 14.days.inWholeMilliseconds,
extend = true
)?.let { extensionRequest ->
requests += extensionRequest
}
}
SnodeAPI.getRawBatchResponse(snode, userPublicKey, requests).bind { rawResponses ->
isCaughtUp = true
if (deferred.promise.isDone()) {
return@bind Promise.ofSuccess(Unit)
} else {
val responseList = (rawResponses["results"] as List<RawResponse>)
// in case we had null configs, the array won't be fully populated
// index of the sparse array key iterator should be the request index, with the key being the namespace
// TODO: add in specific ordering of config namespaces for processing
listOfNotNull(
configFactory.user?.configNamespace(),
configFactory.contacts?.configNamespace(),
configFactory.userGroups?.configNamespace(),
configFactory.convoVolatile?.configNamespace()
).map {
it to requestSparseArray.indexOfKey(it)
}.filter { (_, i) -> i >= 0 }.forEach { (key, requestIndex) ->
responseList.getOrNull(requestIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki", "Batch sub-request had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
return@forEach
}
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki", "Batch sub-request didn't contain a body")
return@forEach
}
if (key == Namespace.DEFAULT) {
return@forEach // continue, skip default namespace
} else {
when (ConfigBase.kindFor(key)) {
UserProfile::class.java -> processConfig(snode, body, key, configFactory.user)
Contacts::class.java -> processConfig(snode, body, key, configFactory.contacts)
ConversationVolatileConfig::class.java -> processConfig(snode, body, key, configFactory.convoVolatile)
UserGroupsConfig::class.java -> processConfig(snode, body, key, configFactory.userGroups)
}
}
}
}
// the first response will be the personal messages (we want these to be processed after config messages)
val personalResponseIndex = requestSparseArray.indexOfKey(Namespace.DEFAULT)
if (personalResponseIndex >= 0) {
responseList.getOrNull(personalResponseIndex)?.let { rawResponse ->
if (rawResponse["code"] as? Int != 200) {
Log.e("Loki", "Batch sub-request for personal messages had non-200 response code, returned code ${(rawResponse["code"] as? Int) ?: "[unknown]"}")
} else {
val body = rawResponse["body"] as? RawResponse
if (body == null) {
Log.e("Loki", "Batch sub-request for personal messages didn't contain a body")
} else {
processPersonalMessages(snode, body)
}
}
}
}
poll(snode, deferred)
}
}.fail {
Log.e("Loki", "Failed to get raw batch response", it)
poll(snode, deferred)
}
}
}
}

View File

@@ -74,6 +74,7 @@ object UpdateMessageBuilder {
context.getString(R.string.ConversationItem_group_action_left, senderName)
}
}
is UpdateMessageData.Kind.OpenGroupInvitation -> { /*Handled externally*/ }
}
return message
}

View File

@@ -419,6 +419,8 @@ object OnionRequestAPI {
Log.d("Loki","Destination server returned ${exception.statusCode}")
} else if (message == "Loki Server error") {
Log.d("Loki", "message was $message")
} else if (exception.statusCode == 404) {
// 404 is probably file server missing a file, don't rebuild path or mark a snode as bad here
} else { // Only drop snode/path if not receiving above two exception cases
handleUnspecificError()
}
@@ -446,8 +448,8 @@ object OnionRequestAPI {
val payloadData = JsonUtil.toJson(payload).toByteArray()
return sendOnionRequest(Destination.Snode(snode), payloadData, version).recover { exception ->
val error = when (exception) {
is HTTP.HTTPRequestFailedException -> SnodeAPI.handleSnodeError(exception.statusCode, exception.json, snode, publicKey)
is HTTPRequestFailedAtDestinationException -> SnodeAPI.handleSnodeError(exception.statusCode, exception.json, snode, publicKey)
is HTTP.HTTPRequestFailedException -> SnodeAPI.handleSnodeError(exception.statusCode, exception.json, snode, publicKey)
else -> null
}
if (error != null) { throw error }

View File

@@ -28,12 +28,12 @@ import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Hex
import org.session.libsignal.utilities.JsonUtil
import org.session.libsignal.utilities.Log
import org.session.libsignal.utilities.Namespace
import org.session.libsignal.utilities.Snode
import org.session.libsignal.utilities.ThreadUtils
import org.session.libsignal.utilities.prettifiedDescription
import org.session.libsignal.utilities.retryIfNeeded
import java.security.SecureRandom
import java.util.Date
import java.util.Locale
import kotlin.collections.component1
import kotlin.collections.component2
@@ -102,6 +102,14 @@ object SnodeAPI {
object ValidationFailed : Error("ONS name validation failed.")
}
// Batch
data class SnodeBatchRequestInfo(
val method: String,
val params: Map<String, Any>,
@Transient
val namespace: Int?
) // assume signatures, pubkey and namespaces are attached in parameters if required
// Internal API
internal fun invoke(
method: Snode.Method,
@@ -319,26 +327,32 @@ object SnodeAPI {
fun getRawMessages(snode: Snode, publicKey: String, requiresAuth: Boolean = true, namespace: Int = 0): RawResponsePromise {
// Get last message hash
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val parameters = mutableMapOf<String,Any>(
val parameters = mutableMapOf<String, Any>(
"pubKey" to publicKey,
"last_hash" to lastHashValue,
)
// Construct signature
if (requiresAuth) {
val userED25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(Error.NoKeyPair)
MessagingModuleConfiguration.shared.getUserED25519KeyPair()
?: return Promise.ofFail(Error.NoKeyPair)
} catch (e: Exception) {
Log.e("Loki", "Error getting KeyPair", e)
return Promise.ofFail(Error.NoKeyPair)
}
val timestamp = Date().time + SnodeAPI.clockOffset
val timestamp = System.currentTimeMillis() + clockOffset
val ed25519PublicKey = userED25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData =
if (namespace != 0) "retrieve$namespace$timestamp".toByteArray()
else "retrieve$timestamp".toByteArray()
try {
sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), userED25519KeyPair.secretKey.asBytes)
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userED25519KeyPair.secretKey.asBytes
)
} catch (exception: Exception) {
return Promise.ofFail(Error.SigningFailed)
}
@@ -354,7 +368,251 @@ object SnodeAPI {
}
// Make the request
return invoke(Snode.Method.GetMessages, snode, parameters, publicKey)
return invoke(Snode.Method.Retrieve, snode, parameters, publicKey)
}
fun buildAuthenticatedStoreBatchInfo(publicKey: String, namespace: Int, message: SnodeMessage): SnodeBatchRequestInfo? {
val params = mutableMapOf<String, Any>()
// load the message data params into the sub request
// currently loads:
// pubKey
// data
// ttl
// timestamp
params.putAll(message.toJSON())
params["namespace"] = namespace
// used for sig generation since it is also the value used in timestamp parameter
val messageTimestamp = message.timestamp
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData = "store$namespace$messageTimestamp".toByteArray()
try {
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
}
// timestamp already set
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return SnodeBatchRequestInfo(
Snode.Method.SendMessage.rawValue,
params,
namespace
)
}
/**
* Message hashes can be shared across multiple namespaces (for a single public key destination)
* @param publicKey the destination's identity public key to delete from (05...)
* @param messageHashes a list of stored message hashes to delete from the server
* @param required indicates that *at least one* message in the list is deleted from the server, otherwise it will return 404
*/
fun buildAuthenticatedDeleteBatchInfo(publicKey: String, messageHashes: List<String>, required: Boolean = false): SnodeBatchRequestInfo? {
val params = mutableMapOf(
"pubkey" to publicKey,
"required" to required, // could be omitted technically but explicit here
"messages" to messageHashes
)
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
val verificationData = "delete${messageHashes.joinToString("")}".toByteArray()
try {
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
return null
}
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return SnodeBatchRequestInfo(
Snode.Method.DeleteMessage.rawValue,
params,
null
)
}
fun buildAuthenticatedRetrieveBatchRequest(snode: Snode, publicKey: String, namespace: Int = 0, maxSize: Int? = null): SnodeBatchRequestInfo? {
val lastHashValue = database.getLastMessageHashValue(snode, publicKey, namespace) ?: ""
val params = mutableMapOf<String, Any>(
"pubkey" to publicKey,
"last_hash" to lastHashValue,
)
val userEd25519KeyPair = try {
MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
} catch (e: Exception) {
return null
}
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val timestamp = System.currentTimeMillis() + clockOffset
val signature = ByteArray(Sign.BYTES)
val verificationData = if (namespace == 0) "retrieve$timestamp".toByteArray()
else "retrieve$namespace$timestamp".toByteArray()
try {
sodium.cryptoSignDetached(
signature,
verificationData,
verificationData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
return null
}
params["timestamp"] = timestamp
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
if (namespace != 0) {
params["namespace"] = namespace
}
if (maxSize != null) {
params["max_size"] = maxSize
}
return SnodeBatchRequestInfo(
Snode.Method.Retrieve.rawValue,
params,
namespace
)
}
fun buildAuthenticatedAlterTtlBatchRequest(
messageHashes: List<String>,
newExpiry: Long,
publicKey: String,
shorten: Boolean = false,
extend: Boolean = false): SnodeBatchRequestInfo? {
val params = buildAlterTtlParams(messageHashes, newExpiry, publicKey, extend, shorten) ?: return null
return SnodeBatchRequestInfo(
Snode.Method.Expire.rawValue,
params,
null
)
}
fun getRawBatchResponse(snode: Snode, publicKey: String, requests: List<SnodeBatchRequestInfo>, sequence: Boolean = false): RawResponsePromise {
val parameters = mutableMapOf<String, Any>(
"requests" to requests
)
return invoke(if (sequence) Snode.Method.Sequence else Snode.Method.Batch, snode, parameters, publicKey).success { rawResponses ->
val responseList = (rawResponses["results"] as List<RawResponse>)
responseList.forEachIndexed { index, response ->
if (response["code"] as? Int != 200) {
Log.w("Loki", "response code was not 200")
handleSnodeError(
response["code"] as? Int ?: 0,
response,
snode,
publicKey
)
}
}
}
}
fun getExpiries(messageHashes: List<String>, publicKey: String) : RawResponsePromise {
val userEd25519KeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return Promise.ofFail(NullPointerException("No user key pair"))
return retryIfNeeded(maxRetryCount) {
val timestamp = System.currentTimeMillis() + clockOffset
val params = mutableMapOf(
"pubkey" to publicKey,
"messages" to messageHashes,
"timestamp" to timestamp
)
val signData = "${Snode.Method.GetExpiries.rawValue}$timestamp${messageHashes.joinToString(separator = "")}".toByteArray()
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
try {
sodium.cryptoSignDetached(
signature,
signData,
signData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
return@retryIfNeeded Promise.ofFail(e)
}
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
getSingleTargetSnode(publicKey).bind { snode ->
invoke(Snode.Method.GetExpiries, snode, params, publicKey)
}
}
}
fun alterTtl(messageHashes: List<String>, newExpiry: Long, publicKey: String, extend: Boolean = false, shorten: Boolean = false): RawResponsePromise {
return retryIfNeeded(maxRetryCount) {
val params = buildAlterTtlParams(messageHashes, newExpiry, publicKey, extend, shorten)
?: return@retryIfNeeded Promise.ofFail(
Exception("Couldn't build signed params for alterTtl request for newExpiry=$newExpiry, extend=$extend, shorten=$shorten")
)
getSingleTargetSnode(publicKey).bind { snode ->
invoke(Snode.Method.Expire, snode, params, publicKey)
}
}
}
private fun buildAlterTtlParams( // TODO: in future this will probably need to use the closed group subkeys / admin keys for group swarms
messageHashes: List<String>,
newExpiry: Long,
publicKey: String,
extend: Boolean = false,
shorten: Boolean = false): Map<String, Any>? {
val userEd25519KeyPair = MessagingModuleConfiguration.shared.getUserED25519KeyPair() ?: return null
val params = mutableMapOf(
"expiry" to newExpiry,
"messages" to messageHashes,
)
if (extend) {
params["extend"] = true
} else if (shorten) {
params["shorten"] = true
}
val shortenOrExtend = if (extend) "extend" else if (shorten) "shorten" else ""
val signData = "${Snode.Method.Expire.rawValue}$shortenOrExtend$newExpiry${messageHashes.joinToString(separator = "")}".toByteArray()
val ed25519PublicKey = userEd25519KeyPair.publicKey.asHexString
val signature = ByteArray(Sign.BYTES)
try {
sodium.cryptoSignDetached(
signature,
signData,
signData.size.toLong(),
userEd25519KeyPair.secretKey.asBytes
)
} catch (e: Exception) {
Log.e("Loki", "Signing data failed with user secret key", e)
return null
}
params["pubkey"] = publicKey
params["pubkey_ed25519"] = ed25519PublicKey
params["signature"] = Base64.encodeBytes(signature)
return params
}
fun getMessages(publicKey: String): MessageListPromise {
@@ -483,13 +741,14 @@ object SnodeAPI {
retryIfNeeded(maxRetryCount) {
getNetworkTime(snode).bind { (_, timestamp) ->
val signature = ByteArray(Sign.BYTES)
val verificationData = (Snode.Method.DeleteAll.rawValue + timestamp.toString()).toByteArray()
val verificationData = (Snode.Method.DeleteAll.rawValue + Namespace.ALL + timestamp.toString()).toByteArray()
sodium.cryptoSignDetached(signature, verificationData, verificationData.size.toLong(), userED25519KeyPair.secretKey.asBytes)
val deleteMessageParams = mapOf(
"pubkey" to userPublicKey,
"pubkey_ed25519" to userED25519KeyPair.publicKey.asHexString,
"timestamp" to timestamp,
"signature" to Base64.encodeBytes(signature)
"signature" to Base64.encodeBytes(signature),
"namespace" to Namespace.ALL,
)
invoke(Snode.Method.DeleteAll, snode, deleteMessageParams, userPublicKey).map {
rawResponse -> parseDeletions(userPublicKey, timestamp, rawResponse)
@@ -502,11 +761,13 @@ object SnodeAPI {
}
}
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0): List<Pair<SignalServiceProtos.Envelope, String?>> {
fun parseRawMessagesResponse(rawResponse: RawResponse, snode: Snode, publicKey: String, namespace: Int = 0, updateLatestHash: Boolean = true, updateStoredHashes: Boolean = true): List<Pair<SignalServiceProtos.Envelope, String?>> {
val messages = rawResponse["messages"] as? List<*>
return if (messages != null) {
updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
val newRawMessages = removeDuplicates(publicKey, messages, namespace)
if (updateLatestHash) {
updateLastMessageHashValueIfPossible(snode, publicKey, messages, namespace)
}
val newRawMessages = removeDuplicates(publicKey, messages, namespace, updateStoredHashes)
return parseEnvelopes(newRawMessages)
} else {
listOf()
@@ -523,7 +784,7 @@ object SnodeAPI {
}
}
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int): List<*> {
private fun removeDuplicates(publicKey: String, rawMessages: List<*>, namespace: Int, updateStoredHashes: Boolean): List<*> {
val originalMessageHashValues = database.getReceivedMessageHashValues(publicKey, namespace)?.toMutableSet() ?: mutableSetOf()
val receivedMessageHashValues = originalMessageHashValues.toMutableSet()
val result = rawMessages.filter { rawMessage ->
@@ -538,7 +799,7 @@ object SnodeAPI {
false
}
}
if (originalMessageHashValues != receivedMessageHashValues) {
if (originalMessageHashValues != receivedMessageHashValues && updateStoredHashes) {
database.setReceivedMessageHashValues(publicKey, receivedMessageHashValues, namespace)
}
return result
@@ -575,11 +836,11 @@ object SnodeAPI {
Log.e("Loki", "Failed to delete all messages from: $hexSnodePublicKey due to error: $reason ($statusCode).")
false
} else {
val hashes = json["deleted"] as List<String> // Hashes of deleted messages
val hashes = (json["deleted"] as Map<String,List<String>>).flatMap { (_, hashes) -> hashes }.sorted() // Hashes of deleted messages
val signature = json["signature"] as String
val snodePublicKey = Key.fromHexString(hexSnodePublicKey)
// The signature looks like ( PUBKEY_HEX || TIMESTAMP || DELETEDHASH[0] || ... || DELETEDHASH[N] )
val message = (userPublicKey + timestamp.toString() + hashes.fold("") { a, v -> a + v }).toByteArray()
val message = (userPublicKey + timestamp.toString() + hashes.joinToString(separator = "")).toByteArray()
sodium.cryptoSignVerifyDetached(Base64.decode(signature), message, message.size, snodePublicKey.asBytes)
}
}
@@ -635,6 +896,10 @@ object SnodeAPI {
Log.d("Loki", "Got a 421 without an associated public key.")
}
}
404 -> {
Log.d("Loki", "404, probably no file found")
return Error.Generic
}
else -> {
handleBadSnode()
Log.d("Loki", "Unhandled response code: ${statusCode}.")

View File

@@ -5,11 +5,11 @@ import android.os.Parcel
import android.os.Parcelable
import android.util.Pair
import androidx.annotation.VisibleForTesting
import org.session.libsession.utilities.DelimiterUtil
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.utilities.guava.Optional
import org.session.libsignal.utilities.IdPrefix
import org.session.libsignal.utilities.Util
import java.util.*
import org.session.libsignal.utilities.guava.Optional
import java.util.Collections
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicReference
import java.util.regex.Matcher
import java.util.regex.Pattern
@@ -27,6 +27,8 @@ class Address private constructor(address: String) : Parcelable, Comparable<Addr
get() = GroupUtil.isOpenGroup(address)
val isOpenGroupInbox: Boolean
get() = GroupUtil.isOpenGroupInbox(address)
val isOpenGroupOutbox: Boolean
get() = address.startsWith(IdPrefix.BLINDED.value) || address.startsWith(IdPrefix.BLINDEDV2.value)
val isContact: Boolean
get() = !(isGroup || isOpenGroupInbox)

View File

@@ -0,0 +1,23 @@
package org.session.libsession.utilities
import network.loki.messenger.libsession_util.ConfigBase
import network.loki.messenger.libsession_util.Contacts
import network.loki.messenger.libsession_util.ConversationVolatileConfig
import network.loki.messenger.libsession_util.UserGroupsConfig
import network.loki.messenger.libsession_util.UserProfile
interface ConfigFactoryProtocol {
val user: UserProfile?
val contacts: Contacts?
val convoVolatile: ConversationVolatileConfig?
val userGroups: UserGroupsConfig?
fun getUserConfigs(): List<ConfigBase>
fun persist(forConfigObject: ConfigBase, timestamp: Long)
fun conversationInConfig(publicKey: String?, groupPublicKey: String?, openGroupId: String?, visibleOnly: Boolean): Boolean
fun canPerformChange(variant: String, publicKey: String, changeTimestampMs: Long): Boolean
}
interface ConfigFactoryUpdateListener {
fun notifyUpdates(forConfigObject: ConfigBase)
}

View File

@@ -4,7 +4,9 @@ import okhttp3.HttpUrl
import org.session.libsession.messaging.file_server.FileServerApi
import org.session.libsignal.utilities.HTTP
import org.session.libsignal.utilities.Log
import java.io.*
import java.io.File
import java.io.FileOutputStream
import java.io.OutputStream
object DownloadUtilities {
@@ -14,7 +16,7 @@ object DownloadUtilities {
@JvmStatic
fun downloadFile(destination: File, url: String) {
val outputStream = FileOutputStream(destination) // Throws
var remainingAttempts = 4
var remainingAttempts = 2
var exception: Exception? = null
while (remainingAttempts > 0) {
remainingAttempts -= 1

View File

@@ -1,9 +1,9 @@
package org.session.libsession.utilities
import network.loki.messenger.libsession_util.util.GroupInfo
import org.session.libsignal.messages.SignalServiceGroup
import org.session.libsignal.utilities.Hex
import java.io.IOException
import kotlin.jvm.Throws
object GroupUtil {
const val CLOSED_GROUP_PREFIX = "__textsecure_group__!"
@@ -97,4 +97,28 @@ object GroupUtil {
fun doubleDecodeGroupID(groupID: String): ByteArray {
return getDecodedGroupIDAsData(getDecodedGroupID(groupID))
}
@JvmStatic
@Throws(IOException::class)
fun doubleDecodeGroupId(groupID: String): String {
return Hex.toStringCondensed(getDecodedGroupIDAsData(getDecodedGroupID(groupID)))
}
fun createConfigMemberMap(
members: Collection<String>,
admins: Collection<String>
): Map<String, Boolean> {
// Start with admins
val memberMap = admins.associate {
it to true
}.toMutableMap()
// Add the remaining members (there may be duplicates, so only add ones that aren't already in there from admins)
for (member in members) {
if (!memberMap.contains(member)) {
memberMap[member] = false
}
}
return memberMap
}
}

View File

@@ -1,23 +1,24 @@
package org.session.libsession.utilities;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.session.libsignal.utilities.Base64;
import org.session.libsession.utilities.TextSecurePreferences;
import org.session.libsession.utilities.Util;
import java.io.IOException;
public class ProfileKeyUtil {
public static final int PROFILE_KEY_BYTES = 32;
public static synchronized @NonNull byte[] getProfileKey(@NonNull Context context) {
try {
String encodedProfileKey = TextSecurePreferences.getProfileKey(context);
if (encodedProfileKey == null) {
encodedProfileKey = Util.getSecret(32);
encodedProfileKey = Util.getSecret(PROFILE_KEY_BYTES);
TextSecurePreferences.setProfileKey(context, encodedProfileKey);
}
@@ -36,7 +37,7 @@ public class ProfileKeyUtil {
}
public static synchronized @NonNull String generateEncodedProfileKey(@NonNull Context context) {
return Util.getSecret(32);
return Util.getSecret(PROFILE_KEY_BYTES);
}
public static synchronized void setEncodedProfileKey(@NonNull Context context, @Nullable String key) {

View File

@@ -1,9 +1,9 @@
package org.session.libsession.utilities
import android.content.Context
import org.session.libsession.messaging.contacts.Contact
import org.session.libsession.messaging.messages.control.ExpirationTimerUpdate
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier
import org.session.libsession.utilities.Address
import org.session.libsession.utilities.recipients.Recipient
class SSKEnvironment(
@@ -30,10 +30,10 @@ class SSKEnvironment(
}
fun setNickname(context: Context, recipient: Recipient, nickname: String?)
fun setName(context: Context, recipient: Recipient, name: String)
fun setProfilePictureURL(context: Context, recipient: Recipient, profilePictureURL: String)
fun setProfileKey(context: Context, recipient: Recipient, profileKey: ByteArray)
fun setName(context: Context, recipient: Recipient, name: String?)
fun setProfilePicture(context: Context, recipient: Recipient, profilePictureURL: String?, profileKey: ByteArray?)
fun setUnidentifiedAccessMode(context: Context, recipient: Recipient, unidentifiedAccessMode: Recipient.UnidentifiedAccessMode)
fun contactUpdatedInternal(contact: Contact): String?
}
interface MessageExpirationManagerProtocol {

View File

@@ -12,7 +12,6 @@ import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import org.session.libsession.BuildConfig
import org.session.libsession.R
import org.session.libsession.utilities.TextSecurePreferences.Companion.AUTOPLAY_AUDIO_MESSAGES
import org.session.libsession.utilities.TextSecurePreferences.Companion.CALL_NOTIFICATIONS_ENABLED
@@ -103,6 +102,8 @@ interface TextSecurePreferences {
fun setUpdateApkDigest(value: String?)
fun getUpdateApkDigest(): String?
fun getLocalNumber(): String?
fun getHasLegacyConfig(): Boolean
fun setHasLegacyConfig(newValue: Boolean)
fun setLocalNumber(localNumber: String)
fun removeLocalNumber()
fun isEnterSendsEnabled(): Boolean
@@ -178,6 +179,7 @@ interface TextSecurePreferences {
fun setThemeStyle(themeStyle: String)
fun setFollowSystemSettings(followSystemSettings: Boolean)
fun autoplayAudioMessages(): Boolean
fun hasForcedNewConfig(): Boolean
fun hasPreference(key: String): Boolean
fun clearAll()
@@ -264,6 +266,10 @@ interface TextSecurePreferences {
const val AUTOPLAY_AUDIO_MESSAGES = "pref_autoplay_audio"
const val FINGERPRINT_KEY_GENERATED = "fingerprint_key_generated"
const val SELECTED_ACCENT_COLOR = "selected_accent_color"
const val HAS_RECEIVED_LEGACY_CONFIG = "has_received_legacy_config"
const val HAS_FORCED_NEW_CONFIG = "has_forced_new_config"
const val GREEN_ACCENT = "accent_green"
const val BLUE_ACCENT = "accent_blue"
const val PURPLE_ACCENT = "accent_purple"
@@ -625,6 +631,17 @@ interface TextSecurePreferences {
return getStringPreference(context, LOCAL_NUMBER_PREF, null)
}
@JvmStatic
fun getHasLegacyConfig(context: Context): Boolean {
return getBooleanPreference(context, HAS_RECEIVED_LEGACY_CONFIG, false)
}
@JvmStatic
fun setHasLegacyConfig(context: Context, newValue: Boolean) {
setBooleanPreference(context, HAS_RECEIVED_LEGACY_CONFIG, newValue)
_events.tryEmit(HAS_RECEIVED_LEGACY_CONFIG)
}
fun setLocalNumber(context: Context, localNumber: String) {
setStringPreference(context, LOCAL_NUMBER_PREF, localNumber.toLowerCase())
}
@@ -795,6 +812,11 @@ interface TextSecurePreferences {
setIntegerPreference(context, NOTIFICATION_MESSAGES_CHANNEL_VERSION, version)
}
@JvmStatic
fun hasForcedNewConfig(context: Context): Boolean {
return getBooleanPreference(context, HAS_FORCED_NEW_CONFIG, false)
}
@JvmStatic
fun getBooleanPreference(context: Context, key: String?, defaultValue: Boolean): Boolean {
return getDefaultSharedPreferences(context).getBoolean(key, defaultValue)
@@ -1279,6 +1301,15 @@ class AppTextSecurePreferences @Inject constructor(
return getStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, null)
}
override fun getHasLegacyConfig(): Boolean {
return getBooleanPreference(TextSecurePreferences.HAS_RECEIVED_LEGACY_CONFIG, false)
}
override fun setHasLegacyConfig(newValue: Boolean) {
setBooleanPreference(TextSecurePreferences.HAS_RECEIVED_LEGACY_CONFIG, newValue)
TextSecurePreferences._events.tryEmit(TextSecurePreferences.HAS_RECEIVED_LEGACY_CONFIG)
}
override fun setLocalNumber(localNumber: String) {
setStringPreference(TextSecurePreferences.LOCAL_NUMBER_PREF, localNumber.toLowerCase())
}
@@ -1422,6 +1453,9 @@ class AppTextSecurePreferences @Inject constructor(
setIntegerPreference(TextSecurePreferences.NOTIFICATION_MESSAGES_CHANNEL_VERSION, version)
}
override fun hasForcedNewConfig(): Boolean =
getBooleanPreference(TextSecurePreferences.HAS_FORCED_NEW_CONFIG, false)
override fun getBooleanPreference(key: String?, defaultValue: Boolean): Boolean {
return getDefaultSharedPreferences(context).getBoolean(key, defaultValue)
}

View File

@@ -99,6 +99,7 @@ public class Recipient implements RecipientModifiedListener {
private boolean profileSharing;
private String notificationChannel;
private boolean forceSmsSelection;
private String wrapperHash;
private @NonNull UnidentifiedAccessMode unidentifiedAccessMode = UnidentifiedAccessMode.ENABLED;
@@ -279,6 +280,7 @@ public class Recipient implements RecipientModifiedListener {
this.profileSharing = details.profileSharing;
this.unidentifiedAccessMode = details.unidentifiedAccessMode;
this.forceSmsSelection = details.forceSmsSelection;
this.wrapperHash = details.wrapperHash;
this.participants.addAll(details.participants);
this.resolving = false;
@@ -325,7 +327,7 @@ public class Recipient implements RecipientModifiedListener {
return contact.displayName(Contact.ContactContext.REGULAR);
} else {
Contact contact = storage.getContactWithSessionID(sessionID);
if (contact == null) { return sessionID; }
if (contact == null) { return null; }
return contact.displayName(Contact.ContactContext.REGULAR);
}
}
@@ -440,6 +442,10 @@ public class Recipient implements RecipientModifiedListener {
return address.isOpenGroup();
}
public boolean isOpenGroupOutboxRecipient() {
return address.isOpenGroupOutbox();
}
public boolean isOpenGroupInboxRecipient() {
return address.isOpenGroupInbox();
}
@@ -483,7 +489,13 @@ public class Recipient implements RecipientModifiedListener {
public synchronized String toShortString() {
String name = getName();
return (name != null ? name : address.serialize());
if (name != null) return name;
String sessionId = address.serialize();
if (sessionId.length() < 4) return sessionId; // so substrings don't throw out of bounds exceptions
int takeAmount = 4;
String start = sessionId.substring(0, takeAmount);
String end = sessionId.substring(sessionId.length()-takeAmount);
return start+"..."+end;
}
public synchronized @NonNull Drawable getFallbackContactPhotoDrawable(Context context, boolean inverted) {
@@ -717,6 +729,14 @@ public class Recipient implements RecipientModifiedListener {
return unidentifiedAccessMode;
}
public String getWrapperHash() {
return wrapperHash;
}
public void setWrapperHash(String wrapperHash) {
this.wrapperHash = wrapperHash;
}
public void setUnidentifiedAccessMode(@NonNull UnidentifiedAccessMode unidentifiedAccessMode) {
synchronized (this) {
this.unidentifiedAccessMode = unidentifiedAccessMode;
@@ -739,12 +759,12 @@ public class Recipient implements RecipientModifiedListener {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Recipient recipient = (Recipient) o;
return resolving == recipient.resolving && mutedUntil == recipient.mutedUntil && notifyType == recipient.notifyType && blocked == recipient.blocked && approved == recipient.approved && approvedMe == recipient.approvedMe && expireMessages == recipient.expireMessages && address.equals(recipient.address) && Objects.equals(name, recipient.name) && Objects.equals(customLabel, recipient.customLabel) && Objects.equals(groupAvatarId, recipient.groupAvatarId) && Arrays.equals(profileKey, recipient.profileKey) && Objects.equals(profileName, recipient.profileName) && Objects.equals(profileAvatar, recipient.profileAvatar);
return resolving == recipient.resolving && mutedUntil == recipient.mutedUntil && notifyType == recipient.notifyType && blocked == recipient.blocked && approved == recipient.approved && approvedMe == recipient.approvedMe && expireMessages == recipient.expireMessages && address.equals(recipient.address) && Objects.equals(name, recipient.name) && Objects.equals(customLabel, recipient.customLabel) && Objects.equals(groupAvatarId, recipient.groupAvatarId) && Arrays.equals(profileKey, recipient.profileKey) && Objects.equals(profileName, recipient.profileName) && Objects.equals(profileAvatar, recipient.profileAvatar) && Objects.equals(wrapperHash, recipient.wrapperHash);
}
@Override
public int hashCode() {
int result = Objects.hash(address, name, customLabel, resolving, groupAvatarId, mutedUntil, notifyType, blocked, approved, approvedMe, expireMessages, profileName, profileAvatar);
int result = Objects.hash(address, name, customLabel, resolving, groupAvatarId, mutedUntil, notifyType, blocked, approved, approvedMe, expireMessages, profileName, profileAvatar, wrapperHash);
result = 31 * result + Arrays.hashCode(profileKey);
return result;
}
@@ -848,6 +868,7 @@ public class Recipient implements RecipientModifiedListener {
private final String notificationChannel;
private final UnidentifiedAccessMode unidentifiedAccessMode;
private final boolean forceSmsSelection;
private final String wrapperHash;
public RecipientSettings(boolean blocked, boolean approved, boolean approvedMe, long muteUntil,
int notifyType,
@@ -869,7 +890,8 @@ public class Recipient implements RecipientModifiedListener {
boolean profileSharing,
@Nullable String notificationChannel,
@NonNull UnidentifiedAccessMode unidentifiedAccessMode,
boolean forceSmsSelection)
boolean forceSmsSelection,
String wrapperHash)
{
this.blocked = blocked;
this.approved = approved;
@@ -895,6 +917,7 @@ public class Recipient implements RecipientModifiedListener {
this.notificationChannel = notificationChannel;
this.unidentifiedAccessMode = unidentifiedAccessMode;
this.forceSmsSelection = forceSmsSelection;
this.wrapperHash = wrapperHash;
}
public @Nullable MaterialColor getColor() {
@@ -992,6 +1015,11 @@ public class Recipient implements RecipientModifiedListener {
public boolean isForceSmsSelection() {
return forceSmsSelection;
}
public String getWrapperHash() {
return wrapperHash;
}
}

View File

@@ -177,6 +177,7 @@ class RecipientProvider {
@Nullable final String notificationChannel;
@NonNull final UnidentifiedAccessMode unidentifiedAccessMode;
final boolean forceSmsSelection;
final String wrapperHash;
RecipientDetails(@Nullable String name, @Nullable Long groupAvatarId,
boolean systemContact, boolean isLocalNumber, @Nullable RecipientSettings settings,
@@ -209,6 +210,7 @@ class RecipientProvider {
this.notificationChannel = settings != null ? settings.getNotificationChannel() : null;
this.unidentifiedAccessMode = settings != null ? settings.getUnidentifiedAccessMode() : UnidentifiedAccessMode.DISABLED;
this.forceSmsSelection = settings != null && settings.isForceSmsSelection();
this.wrapperHash = settings != null ? settings.getWrapperHash() : null;
if (name == null && settings != null) this.name = settings.getSystemDisplayName();
else this.name = name;

View File

@@ -220,18 +220,6 @@
<attr name="emoji_maxLength" format="integer" />
</declare-styleable>
<declare-styleable name="ColorPickerPreference">
<attr name="currentColor" format="reference" />
<attr name="colors" format="reference" />
<attr name="sortColors" format="boolean|reference" />
<attr name="colorDescriptions" format="reference" />
<attr name="columns" format="integer|reference" />
<attr name="colorSize" format="enum|reference">
<enum name="large" value="1" />
<enum name="small" value="2" />
</attr>
</declare-styleable>
<declare-styleable name="VerificationCodeView">
<attr name="vcv_spacing" format="dimension"/>
<attr name="vcv_inputWidth" format="dimension"/>

View File

@@ -1,9 +1,9 @@
package org.session.libsession.utilities
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.Assert.*
class OpenGroupUrlParserTest {
class CommunityUrlParserTest {
@Test
fun parseUrlTest() {