diff --git a/assets/css/elements/base.css b/assets/css/elements/base.css index 09c6a429f..d7e48b926 100644 --- a/assets/css/elements/base.css +++ b/assets/css/elements/base.css @@ -138,8 +138,8 @@ th.center { text-align: center; } -table th > thead > tr, -.table th > thead > tr { +table > thead > tr, +.table > thead > tr { background: var(--background-color); } diff --git a/assets/css/views/tags.css b/assets/css/views/tags.css index 1e12f29ae..057f8ca9f 100644 --- a/assets/css/views/tags.css +++ b/assets/css/views/tags.css @@ -49,6 +49,9 @@ /* Tags */ .tag { + --tag-background: var(--tag-normal-background); + --tag-border: var(--tag-normal-border); + --tag-color: var(--tag-normal-color); border: 1px solid; display: inline-table; align-items: center; @@ -56,9 +59,9 @@ font-weight: bold; margin-bottom: 5px; margin-right: 5px; - background: var(--tag-normal-background); - border-color: var(--tag-normal-border); - color: var(--tag-normal-color); + background: var(--tag-background); + border-color: var(--tag-border); + color: var(--tag-color); } .tag > span { @@ -76,7 +79,7 @@ } .tag__count { - background-color: var(--tag-normal-border); + background-color: var(--tag-border); color: var(--foreground-color); font-weight: normal; } @@ -92,13 +95,9 @@ @define-mixin tag-category $cat { .tag[data-tag-category="$(cat)"] { - background: var(--tag-category-$(cat)-background); - border-color: var(--tag-category-$(cat)-border); - color: var(--tag-category-$(cat)-color); - } - - .tag[data-tag-category="$(cat)"] .tag__count { - background: var(--tag-category-$(cat)-border); + --tag-background: var(--tag-category-$(cat)-background); + --tag-border: var(--tag-category-$(cat)-border); + --tag-color: var(--tag-category-$(cat)-color); } } diff --git a/lib/mix/tasks/reindex_all.ex b/lib/mix/tasks/reindex_all.ex index b1af4887a..41b6f85d7 100644 --- a/lib/mix/tasks/reindex_all.ex +++ b/lib/mix/tasks/reindex_all.ex @@ -11,6 +11,6 @@ defmodule Mix.Tasks.ReindexAll do raise "do not run this task unless you know what you're doing" end - SearchIndexer.recreate_reindex_all_destructive!() + SearchIndexer.recreate_reindex_all_destructive!(maintenance: false) end end diff --git a/lib/philomena/comments.ex b/lib/philomena/comments.ex index bb07ff07e..2970f42b2 100644 --- a/lib/philomena/comments.ex +++ b/lib/philomena/comments.ex @@ -9,11 +9,13 @@ defmodule Philomena.Comments do alias PhilomenaQuery.Search alias Philomena.UserStatistics + alias Philomena.Users.User alias Philomena.Comments.Comment alias Philomena.Comments.SearchIndex, as: CommentIndex alias Philomena.IndexWorker alias Philomena.Images.Image alias Philomena.Images + alias Philomena.Tags.Tag alias Philomena.Notifications alias Philomena.Versions alias Philomena.Reports @@ -265,7 +267,18 @@ defmodule Philomena.Comments do end def indexing_preloads do - [:user, image: :tags] + user_query = select(User, [u], map(u, [:id, :name])) + tag_query = select(Tag, [t], map(t, [:id])) + + image_query = + Image + |> select([i], struct(i, [:approved, :hidden_from_users, :id])) + |> preload(tags: ^tag_query) + + [ + user: user_query, + image: image_query + ] end def perform_reindex(column, condition) do diff --git a/lib/philomena/images.ex b/lib/philomena/images.ex index 360d9853a..1a068fbff 100644 --- a/lib/philomena/images.ex +++ b/lib/philomena/images.ex @@ -876,16 +876,25 @@ defmodule Philomena.Images do end def indexing_preloads do + user_query = select(User, [u], map(u, [:id, :name])) + sources_query = select(Source, [s], map(s, [:image_id, :source])) + alias_tags_query = select(Tag, [t], map(t, [:aliased_tag_id, :name])) + + base_tags_query = + Tag + |> select([t], [:category, :id, :name]) + |> preload(aliases: ^alias_tags_query) + [ - :user, - :favers, - :downvoters, - :upvoters, - :hiders, - :deleter, :gallery_interactions, - :sources, - tags: [:aliases, :aliased_tag] + sources: sources_query, + user: user_query, + favers: user_query, + downvoters: user_query, + upvoters: user_query, + hiders: user_query, + deleter: user_query, + tags: base_tags_query ] end diff --git a/lib/philomena/maintenance.ex b/lib/philomena/maintenance.ex new file mode 100644 index 000000000..22aa5b41e --- /dev/null +++ b/lib/philomena/maintenance.ex @@ -0,0 +1,108 @@ +defmodule Philomena.Maintenance do + @moduledoc """ + Functions related to online and offline maintenance tasks. + """ + + @typedoc "Progress from a stream job." + @type progress_report :: %{ + curr: integer(), + rate: number(), + remaining_time: number() + } + + @doc """ + Periodically stream progress reports for a stream task that produces a range + of integers between `min` and `max`, estimating the rate of progress and time + remaining. + """ + @spec stream_progress( + id_stream :: Enumerable.t({:ok, integer()}), + min :: integer(), + max :: integer(), + report_period :: number() + ) :: Enumerable.t(progress_report()) + def stream_progress(id_stream, min, max, report_period \\ 1.0) do + # Reference point for comparison during the stream. + begin = now() + + # Estimate progress counters based on how many objects have been + # processed since the initial reference point. + create_report = fn state, curr_id -> + curr_rate = (curr_id - min) / max(now() - begin, 1) + remaining_time = (max - curr_id) / max(curr_rate, 1) + + %{ + state: state, + curr: curr_id, + rate: round(curr_rate), + remaining_time: remaining_time + } + end + + # Convert input items received after every period elapses into + # a report, then concatenate an additional report after all items + # are processed. + id_stream + |> Stream.transform(begin, fn {:ok, curr_id}, prev_time -> + curr_time = now() + + if curr_time - prev_time > report_period do + {[create_report.(:in_progress, curr_id)], curr_time} + else + {[], prev_time} + end + end) + |> Stream.concat(Stream.map([[]], fn _ -> create_report.(:done, max) end)) + end + + @doc """ + Write progress reports to the console for a stream task that produces a range + of integers between `min` and `max`, estimating the rate of progress and time + remaining. + """ + @spec log_progress( + id_stream :: Enumerable.t({:ok, integer()}), + label :: String.t(), + min :: integer(), + max :: integer(), + report_period :: number() + ) :: :ok + def log_progress(id_stream, label, min, max, report_period \\ 1.0) do + id_stream + |> stream_progress(min, max, report_period) + |> Enum.each(fn p -> + # Clear line + IO.write("\e[2K\r") + + # Newline on report depends on whether stream is finished + case p.state do + :in_progress -> + eta = format_eta(p.remaining_time) + + IO.write("#{label}: #{p.curr}/#{max} [#{p.rate}/sec], ETA: #{eta}") + + :done -> + IO.puts("#{label}: #{p.curr}/#{max} [#{p.rate}/sec], done.") + end + end) + end + + @spec format_eta(number()) :: String.t() + defp format_eta(remaining_time) do + seconds = round(remaining_time) + minutes = div(seconds, 60) + hours = div(minutes, 60) + + cond do + seconds < 45 -> "about #{seconds} second(s)" + seconds < 90 -> "about a minute" + minutes < 45 -> "about #{minutes} minute(s)" + true -> "about #{hours} hour(s)" + end + end + + @spec now() :: float() + defp now do + :erlang.system_time(:microsecond) / 1_000_000 + end +end diff --git a/lib/philomena/posts.ex b/lib/philomena/posts.ex index 0c2e39ea9..b1d3a73c2 100644 --- a/lib/philomena/posts.ex +++ b/lib/philomena/posts.ex @@ -11,6 +11,7 @@ defmodule Philomena.Posts do alias Philomena.Topics.Topic alias Philomena.Topics alias Philomena.UserStatistics + alias Philomena.Users.User alias Philomena.Posts.Post alias Philomena.Posts.SearchIndex, as: PostIndex alias Philomena.IndexWorker @@ -310,7 +311,17 @@ defmodule Philomena.Posts do end def indexing_preloads do - [:user, topic: :forum] + user_query = select(User, [u], map(u, [:id, :name])) + + topic_query = + Topic + |> select([t], struct(t, [:forum_id, :title])) + |> preload([:forum]) + + [ + user: user_query, + topic: topic_query + ] end def perform_reindex(column, condition) do diff --git a/lib/philomena/search_indexer.ex b/lib/philomena/search_indexer.ex index cf564a5c9..9dc8cf0d0 100644 --- a/lib/philomena/search_indexer.ex +++ b/lib/philomena/search_indexer.ex @@ -17,7 +17,9 @@ defmodule Philomena.SearchIndexer do alias Philomena.Tags alias Philomena.Tags.Tag + alias Philomena.Maintenance alias Philomena.Polymorphic + alias Philomena.Repo import Ecto.Query @schemas [ @@ -60,10 +62,10 @@ defmodule Philomena.SearchIndexer do :ok """ - @spec recreate_reindex_all_destructive! :: :ok - def recreate_reindex_all_destructive! do + @spec recreate_reindex_all_destructive!(opts :: Keyword.t()) :: :ok + def recreate_reindex_all_destructive!(opts \\ []) do @schemas - |> Stream.map(&recreate_reindex_schema_destructive!/1) + |> Stream.map(&recreate_reindex_schema_destructive!(&1, opts)) |> Stream.run() end @@ -77,12 +79,12 @@ defmodule Philomena.SearchIndexer do :ok """ - @spec recreate_reindex_schema_destructive!(schema :: module()) :: :ok - def recreate_reindex_schema_destructive!(schema) when schema in @schemas do + @spec recreate_reindex_schema_destructive!(schema :: module(), opts :: Keyword.t()) :: :ok + def recreate_reindex_schema_destructive!(schema, opts \\ []) when schema in @schemas do Search.delete_index!(schema) Search.create_index!(schema) - reindex_schema(schema) + reindex_schema(schema, opts) end @doc """ @@ -94,10 +96,10 @@ defmodule Philomena.SearchIndexer do :ok """ - @spec reindex_all :: :ok - def reindex_all do + @spec reindex_all(opts :: Keyword.t()) :: :ok + def reindex_all(opts \\ []) do @schemas - |> Stream.map(&reindex_schema/1) + |> Stream.map(&reindex_schema(&1, opts)) |> Stream.run() end @@ -110,10 +112,30 @@ defmodule Philomena.SearchIndexer do :ok """ - @spec reindex_schema(schema :: module()) :: :ok - def reindex_schema(schema) + @spec reindex_schema(schema :: module(), opts :: Keyword.t()) :: :ok + def reindex_schema(schema, opts \\ []) do + maintenance = Keyword.get(opts, :maintenance, true) + + if maintenance do + query = limit(schema, 1) + min = Repo.one(order_by(query, asc: :id)).id + max = Repo.one(order_by(query, desc: :id)).id + + schema + |> reindex_schema_impl(opts) + |> Maintenance.log_progress(inspect(schema), min, max) + else + schema + |> reindex_schema_impl(opts) + |> Stream.run() + end + end + + @spec reindex_schema_impl(schema :: module(), opts :: Keyword.t()) :: + Enumerable.t({:ok, integer()}) + defp reindex_schema_impl(schema, opts) - def reindex_schema(Report) do + defp reindex_schema_impl(Report, opts) do # Reports currently require handling for their polymorphic nature Report |> preload([:user, :admin]) @@ -125,25 +147,24 @@ defmodule Philomena.SearchIndexer do |> Enum.map(&Search.index_document(&1, Report)) end, timeout: :infinity, - max_concurrency: max_concurrency() + max_concurrency: max_concurrency(opts) ) - |> Stream.run() end - def reindex_schema(schema) when schema in @schemas do + defp reindex_schema_impl(schema, opts) when schema in @schemas do # Normal schemas can simply be reindexed with indexing_preloads context = Map.fetch!(@contexts, schema) schema |> preload(^context.indexing_preloads()) - |> Search.reindex(schema, + |> Search.reindex_stream(schema, batch_size: @batch_sizes[schema], - max_concurrency: max_concurrency() + max_concurrency: max_concurrency(opts) ) end - @spec max_concurrency() :: pos_integer() - defp max_concurrency do - System.schedulers_online() + @spec max_concurrency(opts :: Keyword.t()) :: pos_integer() + defp max_concurrency(opts) do + Keyword.get(opts, :max_concurrency, System.schedulers_online()) end end diff --git a/lib/philomena_query/search.ex b/lib/philomena_query/search.ex index ff6a29b1a..cfc3a93e3 100644 --- a/lib/philomena_query/search.ex +++ b/lib/philomena_query/search.ex @@ -189,6 +189,10 @@ defmodule PhilomenaQuery.Search do Note that indexing is near real-time and requires an index refresh before documents will become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed. + > #### Warning {: .warning} + > The returned stream must be enumerated for the reindex to process. If you do not care + > about the progress IDs yielded, use `reindex/3` instead. + ## Example query = @@ -196,11 +200,14 @@ defmodule PhilomenaQuery.Search do where: i.id < 100_000, preload: ^Images.indexing_preloads() - Search.reindex(query, Image, batch_size: 5000) + query + |> Search.reindex_stream(Image, batch_size: 1024) + |> Enum.each(&IO.inspect/1) """ - @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok - def reindex(queryable, module, opts \\ []) do + @spec reindex_stream(queryable(), schema_module(), Batch.batch_options()) :: + Enumerable.t({:ok, integer()}) + def reindex_stream(queryable, module, opts \\ []) do max_concurrency = Keyword.get(opts, :max_concurrency, 1) index = @policy.index_for(module) @@ -208,10 +215,10 @@ defmodule PhilomenaQuery.Search do |> Batch.query_batches(opts) |> Task.async_stream( fn query -> + records = Repo.all(query) + lines = - query - |> Repo.all() - |> Enum.flat_map(fn record -> + Enum.flat_map(records, fn record -> doc = index.as_json(record) [ @@ -221,10 +228,52 @@ defmodule PhilomenaQuery.Search do end) Api.bulk(@policy.opensearch_url(), lines) + + last_id(records) end, timeout: :infinity, max_concurrency: max_concurrency ) + |> flatten_stream() + end + + defp last_id([]), do: [] + defp last_id(records), do: [Enum.max_by(records, & &1.id).id] + + @spec flatten_stream(Enumerable.t({:ok, [integer()]})) :: Enumerable.t({:ok, integer()}) + defp flatten_stream(stream) do + # Converts [{:ok, [1, 2]}] into [{:ok, 1}, {:ok, 2}] + Stream.transform(stream, [], fn {:ok, last_id}, _ -> + {Enum.map(last_id, &{:ok, &1}), []} + end) + end + + @doc """ + Efficiently index a batch of documents in the index named by the module. + + This function is substantially more efficient than running `index_document/2` for + each instance of a schema struct and can index with hundreds of times the throughput. + + The queryable should be a schema type with its indexing preloads included in + the query. The options are forwarded to `PhilomenaQuery.Batch.record_batches/3`. + + Note that indexing is near real-time and requires an index refresh before documents will + become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed. + + ## Example + + query = + from i in Image, + where: i.id < 100_000, + preload: ^Images.indexing_preloads() + + Search.reindex(query, Image, batch_size: 1024) + + """ + @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok + def reindex(queryable, module, opts \\ []) do + queryable + |> reindex_stream(module, opts) |> Stream.run() end