Учебник по схеме
Учебник по схемеУрок 28: Обновление больших наборов данных

Урок 28: Обновление больших наборов данных

Иногда нам нужно обновить тысячи ресурсов в одном действии, как выражено в следующем комментарии (опубликованном в сообществе WordPress):

Я обнаружил, что у многих клиентов мне приходится работать с большими наборами данных (более 10 000 вариаций продукта для 1 продукта или более 13 000 медиафайлов)... неизбежно клиенты хотят иметь возможность массово редактировать множество вещей сразу — например, пометить 2000 медиафайлов одним тегом.

В этом уроке учебника мы рассмотрим способы решения этой задачи.

Nested Mutations

Чтобы этот GraphQL-запрос работал, Конфигурация схемы, применённая к эндпоинту, должна иметь включённые Вложенные мутации

Благодаря Вложенным мутациям мы можем получать и обновлять тысячи ресурсов из БД с помощью одного GraphQL-запроса:

mutation ReplaceOldWithNewDomainInPosts {
  posts(pagination: { limit: 3000 }) {
    id
    rawContent
    adaptedRawContent: _strReplace(
      search: "https://my-old-domain.com"
      replaceWith: "https://my-new-domain.com"
      in: $__rawContent
    )
    update(input: {
      contentAs: { html: $__adaptedRawContent }
    }) {
      status
      errors {
        __typename
        ...on ErrorPayload {
          message
        }
      }
    }
  }
}

Однако в зависимости от устойчивости системы это единственное выполнение GraphQL может создать слишком большую нагрузку на БД и даже привести к её сбою.

Разбивка выполнения GraphQL-запроса на страницы

Если обновление тысяч ресурсов одновременно приводит к сбою системы, решение простое: вместо однократного выполнения GraphQL для тысяч ресурсов можно выполнять его сотни раз для нескольких десятков ресурсов за раз.

Следующий bash-скрипт сначала определяет общее количество комментариев с помощью commentCount, затем вычисляет сегменты с учётом переменной окружения $ENTRIES_TO_PROCESS, а также вычисляет параметры разбивки на страницы и вызывает GraphQL-запрос для каждого сегмента (просто получая комментарии из этого сегмента):

# Get the number of comments in the site
GRAPHQL_RESPONSE=$(curl
  -X POST \
  -H "Content-Type: application/json" \
  -d '{"query": "{\n  commentCount\n}"}' \
  https://mysite.com/graphql/)
 
# Extract the number of comments into a variable
COMMENT_COUNT=$(echo $GRAPHQL_RESPONSE \
  | grep -E -o '"commentCount\":([0-9]+)' \
  | cut -d':' -f2-)
 
echo "Number of comments: $COMMENT_COUNT"
 
# How many entries will be processed on each query
ENTRIES_TO_PROCESS=10
 
# Calculate how many requests must be triggered
PAGINATION_COUNT=$(($(($COMMENT_COUNT / $ENTRIES_TO_PROCESS)) + $(($(($COMMENT_COUNT % $ENTRIES_TO_PROCESS)) ? 1 : 0))))
 
echo "Number of requests to process (at $ENTRIES_TO_PROCESS entries per request): $PAGINATION_COUNT"
 
# Execute the requests, at one per second
for PAGINATION_NUMBER in $(seq 0 $(($PAGINATION_COUNT - 1))); do sleep 1 && echo "\n\nPagination number: $PAGINATION_NUMBER\n" && curl -X POST -H "Content-Type: application/json" -d "{\"query\": \"{ comments(pagination: { limit: $ENTRIES_TO_PROCESS, offset: $(($PAGINATION_NUMBER * $ENTRIES_TO_PROCESS)) }) { id date content } }\"}" https://mysite.com/graphql/ ; done

Рекурсивное выполнение GraphQL-запроса

Поскольку приведённое выше решение предполагает использование bash-скриптов, его необходимо запускать через CLI (или какую-либо панель администратора или инструмент), что ограничивает его применимость.

Мы можем воспроизвести ту же логику непосредственно в GraphQL-запросе, что позволит выполнять его прямо внутри WordPress (и даже сохранить как GraphQL Persisted Query).

Приведённый ниже GraphQL-запрос выполняется рекурсивно. При первом вызове он:

  • Делит общее количество ресурсов для обновления на сегменты (вычисляемые с использованием переданной переменной $limit)
  • Выполняет себя через новый HTTP-запрос для каждого из сегментов (передавая соответствующий $offset в качестве переменной), тем самым обновляя только подмножество всех ресурсов за один раз

GraphQL-запрос является рекурсивным: HTTP-запросы указывают на тот же URL, что и текущий (плюс добавляется переменная $offset для данного сегмента), для чего мы получаем URL (а также тело, метод и заголовки) из текущего HTTP-запроса (с помощью расширения HTTP Request via Schema).

Аргумент $async, переданный в _sendHTTPRequests, установлен в false, чтобы HTTP-запросы выполнялись последовательно один за другим. Кроме того, необязательная переменная $delay позволяет указать, сколько миллисекунд ждать перед отправкой каждого запроса.

После того как все ресурсы обновлены, выполнение GraphQL-запроса достигает конца и завершается:

# When first invoked, we do not pass variable `$offset`
# Then `$offset` is `null`, and dynamic variable `$executeQuery` will be `true`
query ExportExecute(
  $offset: Int
) {
  executeQuery: _notNull(value: $offset)
    @export(as: "executeQuery")
    @remove # Comment this directive to visualize output during development
}
 
# Only calculate the segments on the first invocation of the GraphQL query
query CalculateVars($limit: Int! = 10)
  @depends(on: "ExportExecute")
  @skip(if: $executeQuery)
{
  # Calculate the number of HTTP requests to be sent
  commentCount
  fractionalNumberExecutions: _floatDivide(number: $__commentCount, by: $limit)
    @remove # Comment this directive to visualize output during development
  numberExecutions: _floatCeil(number: $__fractionalNumberExecutions)
  
  # Generate a list of the offset
  arrayOffsets: _arrayPad(array: [], length: $__numberExecutions, value: null)
    @underEachArrayItem(
      passIndexOnwardsAs: "position"
    )
      @applyField(
        name: "_intMultiply"
        arguments: {
          multiply: $position
          with: $limit
        }
        setResultInResponse: true
      )
    @export(as: "offsets")
 
  # Vars needed to generate a list of the HTTP Request inputs,
  # with many of them retrieved from the current HTTP request data
  url: _httpRequestFullURL
    @export(as: "url")
    @remove # Comment this directive to visualize output during development
  method: _httpRequestMethod
    @export(as: "method")
    @remove # Comment this directive to visualize output during development
  headers: _httpRequestHeaders
    @remove # Comment this directive to visualize output during development
  headersInputList: _objectConvertToNameValueEntryList(
    object: $__headers
  )
    @export(as: "headersInputList")
    @remove # Comment this directive to visualize output during development
  body: _httpRequestBody
    @remove # Comment this directive to visualize output during development
  bodyJSONObject: _strDecodeJSONObject(string: $__body)
    @export(as: "bodyJSONObject")
    @remove # Comment this directive to visualize output during development
  bodyHasVariables: _propertyIsSetInJSONObject(
    object: $__bodyJSONObject,
    by: { key: "variables" }
  )
    @export(as: "bodyHasVariables")
    @remove # Comment this directive to visualize output during development
}
 
query GenerateVars
  @depends(on: ["ExportExecute", "CalculateVars"])
  @skip(if: $executeQuery)
{
  bodyJSON: _echo(value: $bodyJSONObject)
    @unless(condition: $bodyHasVariables)
      @objectAddEntry(
        key: "variables"
        value: {}
      )
    @export(as: "bodyJSON")
    @remove # Comment this directive to visualize output during development
}
 
# Generate all the HTTPRequestInput objects to send each of the HTTP requests
query GenerateRequestInputs(
  $timeout: Float,
  $delay: Int
)
  @depends(on: ["ExportExecute", "GenerateVars"])
  @skip(if: $executeQuery)
{
  # Generate a list of the HTTP Request inputs (without the offset)
  requestInputs: _echo(value: $offsets)
    @underEachArrayItem(
      passValueOnwardsAs: "requestOffset"
      affectDirectivesUnderPos: [1, 2]
    )
      @applyField(
        name: "_objectAddEntry",
        arguments: {
          object: $bodyJSON
          underPath: "variables"
          key: "offset"
          value: $requestOffset
        },
        passOnwardsAs: "itemJSON"
      )
      @applyField(
        name: "_echo",
        arguments: {
          value: {
            url: $url
            method: $method
            options: {
              headers: $headersInputList
              json: $itemJSON
              timeout: $timeout
              delay: $delay
            }
          }
        },
        setResultInResponse: true
      )
    @export(as: "requestInputs")
    @remove # Comment this directive to visualize output during development
}
 
# Execute all the generated URLs, either asynchronously or not
query ExecuteURLs
  @depends(on: ["ExportExecute", "GenerateRequestInputs"])
  @skip(if: $executeQuery)
{
  _sendHTTPRequests(
    async: false
    inputs: $requestInputs
  ) {
    statusCode
    contentType
    body
      @remove
    bodyJSON: _strDecodeJSONObject(string: $__body)
  }
}
 
# This is the actual execution of the query.
# In this case, it simply prints the time when it was executed,
# the provided query variables, and the comment IDs for that segment
query ExecuteQuery(
  $offset: Int
  $limit: Int! = 10
)
  @depends(on: "ExportExecute")
  @include(if: $executeQuery)
{
  executionTime: _httpRequestRequestTime
  queryVariables: _sprintf(string: "[$limit: %s, $offset: %s]", values: [$limit, $offset])
  comments(
    pagination: { limit: $limit, offset: $offset }
    sort: { order: ASC, by: ID }
  ) {
    id
  }
}
 
query ExecuteAll
  @depends(on: ["ExecuteURLs", "ExecuteQuery"])
{
  id
    @remove
}

Ответ:

{
  "data": {
    "commentCount": 23,
    "numberExecutions": 3,
    "arrayOffsets": [
      0,
      10,
      20
    ],
    "_sendHTTPRequests": [
      {
        "statusCode": 200,
        "contentType": "application/json",
        "bodyJSON": {
          "data": {
            "executionTime": 1689814467,
            "queryVariables": "[$limit: 10, $offset: 0]",
            "comments": [
              {
                "id": 2
              },
              {
                "id": 3
              },
              {
                "id": 4
              },
              {
                "id": 5
              },
              {
                "id": 6
              },
              {
                "id": 7
              },
              {
                "id": 8
              },
              {
                "id": 9
              },
              {
                "id": 10
              },
              {
                "id": 11
              }
            ]
          }
        }
      },
      {
        "statusCode": 200,
        "contentType": "application/json",
        "bodyJSON": {
          "data": {
            "executionTime": 1689814468,
            "queryVariables": "[$limit: 10, $offset: 10]",
            "comments": [
              {
                "id": 12
              },
              {
                "id": 13
              },
              {
                "id": 16
              },
              {
                "id": 17
              },
              {
                "id": 18
              },
              {
                "id": 19
              },
              {
                "id": 20
              },
              {
                "id": 21
              },
              {
                "id": 22
              },
              {
                "id": 23
              }
            ]
          }
        }
      },
      {
        "statusCode": 200,
        "contentType": "application/json",
        "bodyJSON": {
          "data": {
            "executionTime": 1689814470,
            "queryVariables": "[$limit: 10, $offset: 20]",
            "comments": [
              {
                "id": 24
              },
              {
                "id": 25
              },
              {
                "id": 26
              }
            ]
          }
        }
      }
    ]
  }
}