36 changed files with 4 additions and 719 deletions
@ -1,43 +0,0 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
class AccountsIndex < Chewy::Index |
|||
settings index: { refresh_interval: '5m' }, analysis: { |
|||
analyzer: { |
|||
content: { |
|||
tokenizer: 'whitespace', |
|||
filter: %w(lowercase asciifolding cjk_width), |
|||
}, |
|||
|
|||
edge_ngram: { |
|||
tokenizer: 'edge_ngram', |
|||
filter: %w(lowercase asciifolding cjk_width), |
|||
}, |
|||
}, |
|||
|
|||
tokenizer: { |
|||
edge_ngram: { |
|||
type: 'edge_ngram', |
|||
min_gram: 1, |
|||
max_gram: 15, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? } |
|||
|
|||
root date_detection: false do |
|||
field :id, type: 'long' |
|||
|
|||
field :display_name, type: 'text', analyzer: 'content' do |
|||
field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' |
|||
end |
|||
|
|||
field :acct, type: 'text', analyzer: 'content', value: ->(account) { [account.username, account.domain].compact.join('@') } do |
|||
field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' |
|||
end |
|||
|
|||
field :following_count, type: 'long', value: ->(account) { account.following.local.count } |
|||
field :followers_count, type: 'long', value: ->(account) { account.followers.local.count } |
|||
field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at } |
|||
end |
|||
end |
@ -1,66 +0,0 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
class StatusesIndex < Chewy::Index |
|||
settings index: { refresh_interval: '15m' }, analysis: { |
|||
filter: { |
|||
english_stop: { |
|||
type: 'stop', |
|||
stopwords: '_english_', |
|||
}, |
|||
english_stemmer: { |
|||
type: 'stemmer', |
|||
language: 'english', |
|||
}, |
|||
english_possessive_stemmer: { |
|||
type: 'stemmer', |
|||
language: 'possessive_english', |
|||
}, |
|||
}, |
|||
analyzer: { |
|||
content: { |
|||
tokenizer: 'uax_url_email', |
|||
filter: %w( |
|||
english_possessive_stemmer |
|||
lowercase |
|||
asciifolding |
|||
cjk_width |
|||
english_stop |
|||
english_stemmer |
|||
), |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) |
|||
|
|||
crutch :mentions do |collection| |
|||
data = ::Mention.where(status_id: collection.map(&:id)).where(account: Account.local, silent: false).pluck(:status_id, :account_id) |
|||
data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } |
|||
end |
|||
|
|||
crutch :favourites do |collection| |
|||
data = ::Favourite.where(status_id: collection.map(&:id)).where(account: Account.local).pluck(:status_id, :account_id) |
|||
data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } |
|||
end |
|||
|
|||
crutch :reblogs do |collection| |
|||
data = ::Status.where(reblog_of_id: collection.map(&:id)).where(account: Account.local).pluck(:reblog_of_id, :account_id) |
|||
data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } |
|||
end |
|||
|
|||
crutch :bookmarks do |collection| |
|||
data = ::Bookmark.where(status_id: collection.map(&:id)).where(account: Account.local).pluck(:status_id, :account_id) |
|||
data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } |
|||
end |
|||
|
|||
root date_detection: false do |
|||
field :id, type: 'long' |
|||
field :account_id, type: 'long' |
|||
|
|||
field :text, type: 'text', value: ->(status) { [status.spoiler_text, Formatter.instance.plaintext(status)].concat(status.media_attachments.map(&:description)).concat(status.preloadable_poll ? status.preloadable_poll.options : []).join("\n\n") } do |
|||
field :stemmed, type: 'text', analyzer: 'content' |
|||
end |
|||
|
|||
field :searchable_by, type: 'long', value: ->(status, crutches) { status.searchable_by(crutches) } |
|||
end |
|||
end |
@ -1,37 +0,0 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
class TagsIndex < Chewy::Index |
|||
settings index: { refresh_interval: '15m' }, analysis: { |
|||
analyzer: { |
|||
content: { |
|||
tokenizer: 'keyword', |
|||
filter: %w(lowercase asciifolding cjk_width), |
|||
}, |
|||
|
|||
edge_ngram: { |
|||
tokenizer: 'edge_ngram', |
|||
filter: %w(lowercase asciifolding cjk_width), |
|||
}, |
|||
}, |
|||
|
|||
tokenizer: { |
|||
edge_ngram: { |
|||
type: 'edge_ngram', |
|||
min_gram: 2, |
|||
max_gram: 15, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? } |
|||
|
|||
root date_detection: false do |
|||
field :name, type: 'text', analyzer: 'content' do |
|||
field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content' |
|||
end |
|||
|
|||
field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? } |
|||
field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } } |
|||
field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at } |
|||
end |
|||
end |
@ -1,79 +0,0 @@ |
|||
{{- if .Values.elasticsearch.enabled }} |
|||
apiVersion: batch/v1 |
|||
kind: Job |
|||
metadata: |
|||
name: {{ include "mastodon.fullname" . }}-chewy-upgrade |
|||
labels: |
|||
{{- include "mastodon.labels" . | nindent 4 }} |
|||
annotations: |
|||
"helm.sh/hook": post-install |
|||
"helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded |
|||
"helm.sh/hook-weight": "-1" |
|||
spec: |
|||
template: |
|||
metadata: |
|||
name: {{ include "mastodon.fullname" . }}-chewy-upgrade |
|||
spec: |
|||
restartPolicy: Never |
|||
{{- if (not .Values.mastodon.s3.enabled) }} |
|||
# ensure we run on the same node as the other rails components; only |
|||
# required when using PVCs that are ReadWriteOnce |
|||
{{- if or (eq "ReadWriteOnce" .Values.mastodon.persistence.assets.accessMode) (eq "ReadWriteOnce" .Values.mastodon.persistence.system.accessMode) }} |
|||
affinity: |
|||
podAffinity: |
|||
requiredDuringSchedulingIgnoredDuringExecution: |
|||
- labelSelector: |
|||
matchExpressions: |
|||
- key: component |
|||
operator: In |
|||
values: |
|||
- rails |
|||
topologyKey: kubernetes.io/hostname |
|||
{{- end }} |
|||
volumes: |
|||
- name: assets |
|||
persistentVolumeClaim: |
|||
claimName: {{ template "mastodon.fullname" . }}-assets |
|||
- name: system |
|||
persistentVolumeClaim: |
|||
claimName: {{ template "mastodon.fullname" . }}-system |
|||
{{- end }} |
|||
containers: |
|||
- name: {{ include "mastodon.fullname" . }}-chewy-setup |
|||
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" |
|||
imagePullPolicy: {{ .Values.image.pullPolicy }} |
|||
command: |
|||
- bundle |
|||
- exec |
|||
- rake |
|||
- chewy:upgrade |
|||
envFrom: |
|||
- configMapRef: |
|||
name: {{ include "mastodon.fullname" . }}-env |
|||
- secretRef: |
|||
name: {{ template "mastodon.fullname" . }} |
|||
env: |
|||
- name: "DB_PASS" |
|||
valueFrom: |
|||
secretKeyRef: |
|||
{{- if .Values.postgresql.enabled }} |
|||
name: {{ .Release.Name }}-postgresql |
|||
{{- else }} |
|||
name: {{ template "mastodon.fullname" . }} |
|||
{{- end }} |
|||
key: postgresql-password |
|||
- name: "REDIS_PASSWORD" |
|||
valueFrom: |
|||
secretKeyRef: |
|||
name: {{ .Release.Name }}-redis |
|||
key: redis-password |
|||
- name: "PORT" |
|||
value: {{ .Values.mastodon.web.port | quote }} |
|||
{{- if (not .Values.mastodon.s3.enabled) }} |
|||
volumeMounts: |
|||
- name: assets |
|||
mountPath: /opt/mastodon/public/assets |
|||
- name: system |
|||
mountPath: /opt/mastodon/public/system |
|||
{{- end }} |
|||
{{- end }} |
@ -1,39 +0,0 @@ |
|||
enabled = ENV['ES_ENABLED'] == 'true' |
|||
host = ENV.fetch('ES_HOST') { 'localhost' } |
|||
port = ENV.fetch('ES_PORT') { 9200 } |
|||
user = ENV.fetch('ES_USER') { nil } |
|||
password = ENV.fetch('ES_PASS') { nil } |
|||
fallback_prefix = ENV.fetch('REDIS_NAMESPACE') { nil } |
|||
prefix = ENV.fetch('ES_PREFIX') { fallback_prefix } |
|||
|
|||
Chewy.settings = { |
|||
host: "#{host}:#{port}", |
|||
prefix: prefix, |
|||
enabled: enabled, |
|||
journal: false, |
|||
user: user, |
|||
password: password, |
|||
sidekiq: { queue: 'pull' }, |
|||
} |
|||
|
|||
# We use our own async strategy even outside the request-response |
|||
# cycle, which takes care of checking if Elasticsearch is enabled |
|||
# or not. However, mind that for the Rails console, the :urgent |
|||
# strategy is set automatically with no way to override it. |
|||
Chewy.root_strategy = :custom_sidekiq |
|||
Chewy.request_strategy = :custom_sidekiq |
|||
Chewy.use_after_commit_callbacks = false |
|||
|
|||
module Chewy |
|||
class << self |
|||
def enabled? |
|||
settings[:enabled] |
|||
end |
|||
end |
|||
end |
|||
|
|||
# Elasticsearch uses Faraday internally. Faraday interprets the |
|||
# http_proxy env variable by default which leads to issues when |
|||
# Mastodon is run with hidden services enabled, because |
|||
# Elasticsearch is *not* supposed to be accessed through a proxy |
|||
Faraday.ignore_env_proxy = true |
@ -1,11 +0,0 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
module Chewy |
|||
class Strategy |
|||
class CustomSidekiq < Sidekiq |
|||
def update(_type, _objects, _options = {}) |
|||
super if Chewy.enabled? |
|||
end |
|||
end |
|||
end |
|||
end |
@ -1,156 +0,0 @@ |
|||
# frozen_string_literal: true |
|||
|
|||
require_relative '../../config/boot' |
|||
require_relative '../../config/environment' |
|||
require_relative 'cli_helper' |
|||
|
|||
module Mastodon |
|||
class SearchCLI < Thor |
|||
include CLIHelper |
|||
|
|||
# Indices are sorted by amount of data to be expected in each, so that |
|||
# smaller indices can go online sooner |
|||
INDICES = [ |
|||
AccountsIndex, |
|||
TagsIndex, |
|||
StatusesIndex, |
|||
].freeze |
|||
|
|||
option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' |
|||
option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch' |
|||
option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' |
|||
desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them' |
|||
long_desc <<~LONG_DESC |
|||
If Elasticsearch is empty, this command will create the necessary indices |
|||
and then import data from the database into those indices. |
|||
|
|||
This command will also upgrade indices if the underlying schema has been |
|||
changed since the last run. |
|||
|
|||
Even if creating or upgrading indices is not necessary, data from the |
|||
database will be imported into the indices. |
|||
LONG_DESC |
|||
def deploy |
|||
if options[:concurrency] < 1 |
|||
say('Cannot run with this concurrency setting, must be at least 1', :red) |
|||
exit(1) |
|||
end |
|||
|
|||
if options[:batch_size] < 1 |
|||
say('Cannot run with this batch_size setting, must be at least 1', :red) |
|||
exit(1) |
|||
end |
|||
|
|||
indices = begin |
|||
if options[:only] |
|||
options[:only].map { |str| "#{str.camelize}Index".constantize } |
|||
else |
|||
INDICES |
|||
end |
|||
end |
|||
|
|||
progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) |
|||
|
|||
# First, ensure all indices are created and have the correct |
|||
# structure, so that live data can already be written |
|||
indices.select { |index| index.specification.changed? }.each do |index| |
|||
progress.title = "Upgrading #{index} " |
|||
index.purge |
|||
index.specification.lock! |
|||
end |
|||
|
|||
db_config = ActiveRecord::Base.configurations[Rails.env].dup |
|||
db_config['pool'] = options[:concurrency] + 1 |
|||
ActiveRecord::Base.establish_connection(db_config) |
|||
|
|||
pool = Concurrent::FixedThreadPool.new(options[:concurrency]) |
|||
added = Concurrent::AtomicFixnum.new(0) |
|||
removed = Concurrent::AtomicFixnum.new(0) |
|||
|
|||
progress.title = 'Estimating workload ' |
|||
|
|||
# Estimate the amount of data that has to be imported first |
|||
progress.total = indices.sum { |index| index.adapter.default_scope.count } |
|||
|
|||
# Now import all the actual data. Mind that unlike chewy:sync, we don't |
|||
# fetch and compare all record IDs from the database and the index to |
|||
# find out which to add and which to remove from the index. Because with |
|||
# potentially millions of rows, the memory footprint of such a calculation |
|||
# is uneconomical. So we only ever add. |
|||
indices.each do |index| |
|||
progress.title = "Importing #{index} " |
|||
batch_size = options[:batch_size] |
|||
slice_size = (batch_size / options[:concurrency]).ceil |
|||
|
|||
index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch| |
|||
futures = [] |
|||
|
|||
batch.each_slice(slice_size) do |records| |
|||
futures << Concurrent::Future.execute(executor: pool) do |
|||
begin |
|||
if !progress.total.nil? && progress.progress + records.size > progress.total |
|||
# The number of items has changed between start and now, |
|||
# since there is no good way to predict the final count from |
|||
# here, just change the progress bar to an indeterminate one |
|||
|
|||
progress.total = nil |
|||
end |
|||
|
|||
grouped_records = nil |
|||
bulk_body = nil |
|||
index_count = 0 |
|||
delete_count = 0 |
|||
|
|||
ActiveRecord::Base.connection_pool.with_connection do |
|||
grouped_records = records.to_a.group_by do |record| |
|||
index.adapter.send(:delete_from_index?, record) ? :delete : :to_index |
|||
end |
|||
|
|||
bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body |
|||
end |
|||
|
|||
index_count = grouped_records[:to_index].size if grouped_records.key?(:to_index) |
|||
delete_count = grouped_records[:delete].size if grouped_records.key?(:delete) |
|||
|
|||
# The following is an optimization for statuses specifically, since |
|||
# we want to de-index statuses that cannot be searched by anybody, |
|||
# but can't use Chewy's delete_if logic because it doesn't use |
|||
# crutches and our searchable_by logic depends on them |
|||
if index == StatusesIndex |
|||
bulk_body.map! do |entry| |
|||
if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank? |
|||
index_count -= 1 |
|||
delete_count += 1 |
|||
|
|||
{ delete: entry[:to_index].except(:data) } |
|||
else |
|||
entry |
|||
end |
|||
end |
|||
end |
|||
|
|||
Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body) |
|||
|
|||
progress.progress += records.size |
|||
|
|||
added.increment(index_count) |
|||
removed.increment(delete_count) |
|||
|
|||
sleep 1 |
|||
rescue => e |
|||
progress.log pastel.red("Error importing #{index}: #{e}") |
|||
end |
|||
end |
|||
end |
|||
|
|||
futures.map(&:value) |
|||
end |
|||
end |
|||
|
|||
progress.title = '' |
|||
progress.stop |
|||
|
|||
say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true) |
|||
end |
|||
end |
|||
end |
Loading…
Reference in new issue