From 4fff5ac2dc2b03f4e363335c09a4a63a25195028 Mon Sep 17 00:00:00 2001 From: jubb Date: Fri, 7 May 2021 11:48:03 +1000 Subject: [PATCH 1/4] refactor: make storage reference jobId by string in deletion, don't persist jobs we are about to delete, delete jobs that fail to serialize from storage (probably from corrupt or moved data classes) in temporary message send jobs --- .../securesms/ApplicationContext.java | 1 - .../securesms/database/Storage.kt | 10 +++--- .../loki/database/SessionJobDatabase.kt | 21 ++++++++---- .../libsession/messaging/StorageProtocol.kt | 6 ++-- .../messaging/jobs/AttachmentUploadJob.kt | 2 +- .../libsession/messaging/jobs/JobQueue.kt | 34 ++++++++++++++----- 6 files changed, 49 insertions(+), 25 deletions(-) diff --git a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java index 33e55e18d3c..cc61634f9a6 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java +++ b/app/src/main/java/org/thoughtcrime/securesms/ApplicationContext.java @@ -103,7 +103,6 @@ import kotlin.Unit; import kotlinx.coroutines.Job; import network.loki.messenger.BuildConfig; -import nl.komponents.kovenant.Kovenant; import static nl.komponents.kovenant.android.KovenantAndroid.startKovenant; import static nl.komponents.kovenant.android.KovenantAndroid.stopKovenant; diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 0f449c9f3aa..0ab48243438 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -185,15 +185,15 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseFactory.getSessionJobDatabase(context).persistJob(job) } - override fun markJobAsSucceeded(job: Job) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(job) + override fun markJobAsSucceeded(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(jobId) } - override fun markJobAsFailed(job: Job) { - DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(job) + override fun markJobAsFailed(jobId: String) { + DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(jobId) } - override fun getAllPendingJobs(type: String): List { + override fun getAllPendingJobs(type: String): Map { return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type) } diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt index dda3b7d7eb0..04edd3715e7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/database/SessionJobDatabase.kt @@ -4,6 +4,7 @@ import android.content.ContentValues import android.content.Context import net.sqlcipher.Cursor import org.session.libsession.messaging.jobs.* +import org.session.libsignal.utilities.logging.Log import org.thoughtcrime.securesms.database.Database import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer @@ -30,19 +31,25 @@ class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Databa database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID)) } - fun markJobAsSucceeded(job: Job) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) + fun markJobAsSucceeded(jobId: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) } - fun markJobAsFailed(job: Job) { - databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id)) + fun markJobAsFailed(jobId: String) { + databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(jobId)) } - fun getAllPendingJobs(type: String): List { + fun getAllPendingJobs(type: String): Map { val database = databaseHelper.readableDatabase return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor -> - jobFromCursor(cursor) - } + val jobId = cursor.getString(jobID) + try { + jobId to jobFromCursor(cursor) + } catch (e: Exception) { + Log.e("Loki", "Error serializing Job of type $type",e) + jobId to null + } + }.toMap() } fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { diff --git a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt index f64aa1a032c..da604264d16 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/StorageProtocol.kt @@ -44,9 +44,9 @@ interface StorageProtocol { // Jobs fun persistJob(job: Job) - fun markJobAsSucceeded(job: Job) - fun markJobAsFailed(job: Job) - fun getAllPendingJobs(type: String): List + fun markJobAsSucceeded(jobId: String) + fun markJobAsFailed(jobId: String) + fun getAllPendingJobs(type: String): Map fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? fun getMessageSendJob(messageSendJobID: String): MessageSendJob? fun resumeMessageSendJobIfNeeded(messageSendJobID: String) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt index f1351786182..1d8b1a71702 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/AttachmentUploadJob.kt @@ -108,7 +108,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess val messageSendJob = storage.getMessageSendJob(messageSendJobID) MessageSender.handleFailedMessageSend(this.message, e) if (messageSendJob != null) { - storage.markJobAsFailed(messageSendJob) + storage.markJobAsFailed(messageSendJobID) } } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 9b1f65f4a7e..85a1f33289b 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -78,10 +78,24 @@ class JobQueue : JobDelegate { return } hasResumedPendingJobs = true - val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY) + val allJobTypes = listOf(AttachmentDownloadJob.KEY, + AttachmentDownloadJob.KEY, + MessageReceiveJob.KEY, + MessageSendJob.KEY, + NotifyPNServerJob.KEY + ) allJobTypes.forEach { type -> val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type) - allPendingJobs.sortedBy { it.id }.forEach { job -> + val pendingJobs = mutableListOf() + for ((id, job) in allPendingJobs) { + if (job == null) { + // job failed to serialize, remove it from the DB + handleJobFailedPermanently(id) + } else { + pendingJobs.add(job) + } + } + pendingJobs.sortedBy { it.id }.forEach { job -> Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.") queue.offer(job) // Offer always called on unlimited capacity } @@ -89,17 +103,18 @@ class JobQueue : JobDelegate { } override fun handleJobSucceeded(job: Job) { - MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job) + val jobId = job.id ?: return + MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId) } override fun handleJobFailed(job: Job, error: Exception) { job.failureCount += 1 val storage = MessagingModuleConfiguration.shared.storage if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")} - storage.persistJob(job) if (job.failureCount == job.maxFailureCount) { - storage.markJobAsFailed(job) + handleJobFailedPermanently(job, error) } else { + storage.persistJob(job) val retryInterval = getRetryInterval(job) Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).") timer.schedule(delay = retryInterval) { @@ -110,10 +125,13 @@ class JobQueue : JobDelegate { } override fun handleJobFailedPermanently(job: Job, error: Exception) { - job.failureCount += 1 + val jobId = job.id ?: return + handleJobFailedPermanently(jobId) + } + + private fun handleJobFailedPermanently(jobId: String) { val storage = MessagingModuleConfiguration.shared.storage - storage.persistJob(job) - storage.markJobAsFailed(job) + storage.markJobAsFailed(jobId) } private fun getRetryInterval(job: Job): Long { From ccd9493f628407ad08e3cfd5aa427cb721b85573 Mon Sep 17 00:00:00 2001 From: jubb Date: Fri, 7 May 2021 12:02:12 +1000 Subject: [PATCH 2/4] refactor: remove unlimited by array size and cap at 10_000_000 --- .../org/session/libsession/messaging/jobs/MessageSendJob.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index c695df83119..de25a6fbb88 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -79,7 +79,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override fun serialize(): Data { val kryo = Kryo() kryo.isRegistrationRequired = false - val output = Output(ByteArray(4096), -1) // maxBufferSize '-1' will dynamically grow internally if we run out of room serializing the message + val output = Output(ByteArray(4096), 10_000_000) // maxBufferSize '-1' will dynamically grow internally if we run out of room serializing the message kryo.writeClassAndObject(output, message) output.close() val serializedMessage = output.toBytes() From d707433f28a1d20e2c88dd19ed8dae823fc75f1c Mon Sep 17 00:00:00 2001 From: jubb Date: Fri, 7 May 2021 12:03:04 +1000 Subject: [PATCH 3/4] docs: remove no long applicable docs --- .../org/session/libsession/messaging/jobs/MessageSendJob.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt index de25a6fbb88..83822c4fc76 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/MessageSendJob.kt @@ -79,7 +79,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job { override fun serialize(): Data { val kryo = Kryo() kryo.isRegistrationRequired = false - val output = Output(ByteArray(4096), 10_000_000) // maxBufferSize '-1' will dynamically grow internally if we run out of room serializing the message + val output = Output(ByteArray(4096), 10_000_000) kryo.writeClassAndObject(output, message) output.close() val serializedMessage = output.toBytes() From e7377d640f91f3fb75b9b726587ec1f718356ba5 Mon Sep 17 00:00:00 2001 From: jubb Date: Fri, 7 May 2021 13:36:35 +1000 Subject: [PATCH 4/4] fix: use AttachmentUploadJob instead of two download job keys in all types --- .../main/java/org/session/libsession/messaging/jobs/JobQueue.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index 85a1f33289b..cffb2db7d60 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -78,7 +78,7 @@ class JobQueue : JobDelegate { return } hasResumedPendingJobs = true - val allJobTypes = listOf(AttachmentDownloadJob.KEY, + val allJobTypes = listOf(AttachmentUploadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY,