Многопоточная непрерывная обработка данных на платформе 1С с помощью Mutex
Концепция
Разработчики многопоточных алгоритмов на некоторых низкоуровневых языках программирования не могут пройти мимо близкого знакомства с такими сущностями, как Mutex (Мьютекс) или Spinlock (Спинлок).
В языке 1С такая концепция отсутствует, ввиду невозможности прямого доступа к данным модуля другого потока: всё "общение" с потоком происходит через передачу ему параметров через значение и и ожидания возврата результата во временном хранилище. Таким образом, в 1С для выполнения тяжелой задачи требуется разбить его на небольшие порции и для каждой порции запускать обработку в отдельном потоке, затем ожидать окончания этих потоков и собирать результаты из временных хранилищ в какой-то один общий результат.
Типичный 1С-алгоритм разбивает весь объем данных на число разрешенных ядер, и запускает такое же количество потоков. Основной недостаток такого подхода: если какая-то порция обработается быстро и поток завершится, то освободившееся ядро нельзя загрузить оставшимися данными, так как всё уже заранее распределено на другие потоки. Второй встречающийся вариант - поделить на мелкие порции и для каждой мелкой порции запускать свой поток. Однако запуск потока и его завершение - это сравнительно ресурсоёмкая операция.
В связи с вышеозначенными проблемами тема мьютекса периодичеки возникает на просторах Интернета среди наиболее продвинутых сообществ 1С, но дальше разговоров до каких-либо функционирующих реализаций дело не доходит, по крайней мере я их не встречал в свободном доступе. Так давайте раскроем вопрос до конца! Для начала взглянем на пример работы mutex с использованием библиотеки pthread на языке C:
#include <pthread.h>
pthread_mutex_t mutex;
uint32_t someVar;
void startInThreads( ... ) {
// инициализация мьютекса:
pthread_mutex_init(&mutex, NULL);
// { здесь запуск потоков и ожидание их завершения }
// уничтожение мьютекса:
pthread_mutex_destroy(&mutex);
}
// код потока
void* runThread(void* ptr) {
while(true) {
// { здесь может быть код с длительными вычислениями }
// Накладываем блокировку мьютекса:
pthread_mutex_lock(&mutex);
// Этот отрезок кода выполняется в каждый отрезок времени только в
// одном потоке, другие конкурирующие потоки ждут освобождения мьютекса.
// Можно безопасно читать и записывать в глобальные переменные:
someVar = someFunc();
// освобождаем мьютекс для других потоков:
pthread_mutex_unlock(&mutex);
// { здесь может быть код с длительными вычислениями }
}
}
Как видно в пояснении, существует участок кода, который может выполняться эксклюзивно только в одном из потоков на отрезке времени, а остальные достигнув этот участок кода будут ожидать освобождения мьютекса. Таким образом разрешается конфликт одновременного чтения и записи из нескольких потоков. Эксперты 1С сразу же вспомнят, что это похоже на поведения транзакций СУБД с уровенем изоляции Serializable. Ну что ж, угадали, именно этот механизм я и буду использовать для реализации Mutex на 1С. Реализация будет на паттерне объектного программирования (заранее извиняюсь за термины из Java).
Реализация mutex средствами платформы 1С
Итак, представим себе что мы являемся разработчиками ЗУП и попытаемся слепить подобие мьютекса "из того что было". В качестве основы нашего нового класса (в терминах ООП) будем использовать независимый непериодический регистр сведений. Создаём новый и добавляем одно измерение типа Число (длина 1, точность 0, неотрицательное) и один ресурс типа ХранилищеЗначения:
В модуле менеджера создадим три public static метода:
Процедура СоздатьЭкземпляр(Ид, Данные = Неопределено) Экспорт
МенеджерЗаписи = РегистрыСведений.Мьютекс.СоздатьМенеджерЗаписи();
МенеджерЗаписи.Ид = Ид;
МенеджерЗаписи.Данные = Новый ХранилищеЗначения(Данные);
МенеджерЗаписи.Записать();
КонецПроцедуры
Функция ПолучитьЭкземпляр(Ид) Экспорт
НаборЗаписей = РегистрыСведений.Мьютекс.СоздатьНаборЗаписей();
НаборЗаписей.Ид = Ид;
Возврат НаборЗаписей;
КонецФункции
Процедура УдалитьЭкземпляр(Ид) Экспорт
МенеджерЗаписи = РегистрыСведений.Мьютекс.СоздатьМенеджерЗаписи();
МенеджерЗаписи.Ид = Ид;
МенеджерЗаписи.Прочитать();
МенеджерЗаписи.Удалить();
КонецПроцедуры
В модуле набора записей описываем public переменную, главный public метод и один private метод:
Перем Ид Экспорт;
Процедура ОбработатьДанныеЭксклюзивно(Действие = "", ДанныеПотока = Неопределено) Экспорт
НачатьТранзакцию();
Блокировка = Новый БлокировкаДанных;
ЭлементБлокировки = Блокировка.Добавить();
ЭлементБлокировки.Область = "РегистрСведений.Мьютекс";
ЭлементБлокировки.Режим = РежимБлокировкиДанных.Исключительный;
ЭлементБлокировки.УстановитьЗначение("Ид", Ид);
// установка блокировки
Блокировка.Заблокировать();
Прочитать();
Данные = ЭтотОбъект[0].Данные.Получить();
ПроизвестиОбменДанными(Действие, ДанныеПотока, Данные);
ЭтотОбъект[0].Данные = Новый ХранилищеЗначения(Данные);
Записать();
ЗафиксироватьТранзакцию();
КонецПроцедуры
// Процедура - Произвести обмен данными между ДаннымиПотока и общими Данными
//
// Параметры:
// ДанныеПотока - Произвольный - Данные, полученные из потока
// Данные - Произвольный - Общие данные для храненения параметров и результата
//
Процедура ПроизвестиОбменДанными(Действие, ДанныеПотока, Данные)
// пишем код обмена здесь
КонецПроцедуры
Схема работы с мьютексом такая: весь объем обрабатываемых данных делим на небольшие порции, количество этих порций гораздо больше числа потоков. Далее запускаем требуемое число потоков и эти потоки начинают обрабатывать параллельно порции данных, периодически считывая новые задания и сбрасывая результат в Данные, при этом потоки не завершаются и работают до тех пор, пока не закончатся задания (необработанные порции данных).
Процедура ОбработатьДанныеЭксклюзивно (на схеме это жёлтые блоки)) и является ядром механизма Mutex и предназначена для вызова из потоков. Глобальные Данные будут храниться в хранилище значения в регистре, а данные для записи или чтения из него мы будем передавать в параметре ДанныеПотока. Таким образом между ними можно будет осуществлять обмен в режиме онлайн прямо во время выполнения потоков! Логику обмена прописываем в процедуре ПроизвестиОбменДанными.
Пример полной реализации работы mutex на примере вычисления простых чисел
Я намеренно использую самый долгий неоптимальный алгоритм вычисления простых чисел в заданном диапазоне, чтобы одинаковые по длине порции данных обрабатывались с существенно разной скоростью и расчёты занимали больше времени.
Создаём общий серверный модуль ДлительныеРасчеты и добавляем:
Функция ПростыеЧислаВИнтервале(НачалоИнтервала, КонецИнтервала)
Результат = Новый Массив;
Для Ч = НачалоИнтервала По КонецИнтервала Цикл
ЭтоПростоеЧисло = Истина;
Для М = 2 По Ч - 1 Цикл
Если Ч % М = 0 Тогда
ЭтоПростоеЧисло = Ложь;
КонецЕсли;
КонецЦикла;
Если ЭтоПростоеЧисло Тогда
Результат.Добавить(Ч);
КонецЕсли;
КонецЦикла;
Возврат Результат;
КонецФункции
Для организации списка заданий и раздачи порций потокам понадобится таблица значений. Его и будем использовать в качестве Данных и держать в ресурсе регистра сведений. Здесь мне пришла мысль, что эта таблица должна быть вообще универсальной для разных задач, поэтому сразу добавим в модуль менеджера Мьютекса следующую функцию, которая создает таблицу заданий и сразу заполняет его порциями:
Функция СоздатьЗаполнитьТаблицуЗаданий(Начало, Конец, ДлинаПорции) Экспорт
Результат = Новый ТаблицаЗначений;
ТипЧисло = Новый ОписаниеТипов("Число");
ТипСтрока = Новый ОписаниеТипов("Строка");
// Начало диапазона:
Результат.Колонки.Добавить("Начало", ТипЧисло);
// Конец дипазона:
Результат.Колонки.Добавить("Конец", ТипЧисло);
// Статус задания:
Результат.Колонки.Добавить("Статус", ТипСтрока);
// Результат задания (произвольный, необязательный):
Результат.Колонки.Добавить("Результат");
Н = Начало;
Пока Истина Цикл
К = Н + ДлинаПорции;
Если К > Конец Тогда
К = Конец;
КонецЕсли;
НСтр = Результат.Добавить();
НСтр.Начало = Н;
НСтр.Конец = К;
Н = К + 1;
Если Н >= Конец Тогда
Прервать;
КонецЕсли;
КонецЦикла;
Возврат Результат;
КонецФункции
В колонке Статус будем помечать, что порция захвачена потоком или была обработана. Результат обработки порции будем записывать в колонку Результат. Чаще всего в реальной жизни придется записывать результаты длительной обработки в базу данных (например переписывать регистры, создавать документы), поэтому запись в колонку Результат является факультативной. Но для нашей задачи вполне уместно записывать именно в неё.
Должен предупредить, что перезапись ТаблицыЗначений в ресурсе типа ХранилищеЗначения является возможно не самым эффективным способом раздачи порций, в иных условиях может быть выгоднее использовать строки и измерения регистра сведений, а блокировку устанавливать более избирательно, особенно если число порций исчисляется десятками и сотнями тысяч. Способ через Хранилище проще для понимания всей механики и требует меньше кода.
Код потока добавляем в общий модуль ДлительныеРасчеты:
Процедура ПростыеЧисла() Экспорт
ДанныеПотока = Новый Структура("Статус,Начало,Конец,Результат");
ДанныеПотока.Статус = "Пустой";
ДанныеПотока.Начало = 0;
ДанныеПотока.Конец = 0;
ДанныеПотока.Результат = Неопределено;
Мьютекс = РегистрыСведений.Мьютекс.ПолучитьЭкземпляр(0);
Пока Истина Цикл
// отдаем результат (если есть) и забираем новую порцию:
Мьютекс.ОбработатьДанныеЭксклюзивно("ОтдатьЗабратьПорцию", ДанныеПотока);
Если ДанныеПотока.Статус = "Пустой" Тогда
// все задания закончились, завершаем поток
Прервать;
Иначе
ДанныеПотока.Результат = ПростыеЧислаВИнтервале(ДанныеПотока.Начало, ДанныеПотока.Конец);
КонецЕсли;
КонецЦикла;
Конецпроцедуры
Для обмена данными понадобится дописать в модуле набора записей:
Процедура ПроизвестиОбменДанными(Действие, ДанныеПотока, Данные)
Если Действие = "ОтдатьЗабратьПорцию" Тогда
ОтдатьЗабратьПорцию(ДанныеПотока, Данные)
ИначеЕсли Действие = "ПрочитатьРезультат" Тогда
ПрочитатьРезультат(ДанныеПотока, Данные);
КонецЕсли;
КонецПроцедуры
Процедура ОтдатьЗабратьПорцию(ДанныеПотока, Данные)
Если ДанныеПотока.Результат <> Неопределено Тогда
П = Новый Структура;
П.Вставить("Статус", "ВыданоЗадание");
П.Вставить("Начало", ДанныеПотока.Начало);
П.Вставить("Конец", ДанныеПотока.Конец);
ДанныеСтроки = Данные.НайтиСтроки(П);
Если ДанныеСтроки.Количество() > 0 Тогда
Строка = ДанныеСтроки[0];
ЗаполнитьЗначенияСвойств(Строка, ДанныеПотока);
Строка.Статус = "Завершено";
КонецЕсли;
КонецЕсли;
П = Новый Структура("Статус", "");
ДанныеСтроки = Данные.НайтиСтроки(П);
Если ДанныеСтроки.Количество() > 0 Тогда
Строка = ДанныеСтроки[0];
Строка.Статус = "ВыданоЗадание";
ЗаполнитьЗначенияСвойств(ДанныеПотока, Строка);
Иначе
ДанныеПотока.Статус = "Пустой";
КонецЕсли;
КонецПроцедуры
Процедура ПрочитатьРезультат(ДанныеПотока, Данные)
ДанныеПотока = Данные;
КонецПроцедуры
Для запуска всего этого нужно создать обработку и в форме дописать следующий код:
&НаСервере
Процедура ПростыеЧислаНаСервере()
ВремяНачала = ТекущаяДатаСеанса();
ТаблицаЗаданий = РегистрыСведений.Мьютекс.СоздатьЗаполнитьТаблицуЗаданий(
НачалоИнтервала,
КонецИнтервала,
ДлинаПорции
);
РегистрыСведений.Мьютекс.СоздатьЭкземпляр(0, ТаблицаЗаданий);
МассивПотоков = Новый Массив;
Для НомерПотока = 1 По КоличествоПотоков Цикл
Поток = ФоновыеЗадания.Выполнить(
"ДлительныеРасчеты.ПростыеЧисла",
,
"поток#" + Строка(НомерПотока),
"Поиск простых числел"
);
МассивПотоков.Добавить(Поток);
КонецЦикла;
Пока Истина Цикл
МассивПотоков = ФоновыеЗадания.ОжидатьЗавершенияВыполнения(МассивПотоков);
ВсеЗаданияЗавершены = Истина;
Для Каждого Поток Из МассивПотоков Цикл
Если Поток.Состояние = СостояниеФоновогоЗадания.Активно Тогда
ВсеЗаданияЗавершены = Ложь;
Прервать;
КонецЕсли;
КонецЦикла;
Если ВсеЗаданияЗавершены Тогда
Прервать;
КонецЕсли;
КонецЦикла;
РегистрыСведений.Мьютекс
.ПолучитьЭкземпляр(0)
.ОбработатьДанныеЭксклюзивно(
"ПрочитатьРезультат",
ТаблицаЗаданий
);
РегистрыСведений.Мьютекс.УдалитьЭкземпляр(0);
ДлительностьВыполнения = ТекущаяДатаСеанса() - ВремяНачала;
Задания.Загрузить(ТаблицаЗаданий); // в реквизит формы
Сообщить("Время выполнения: " + ДлительностьВыполнения + " сек.");
КонецПроцедуры
В конце публикации будет ссылка на скачивание демо-конфигурации, в которой добавлены замеры производительности, в том числе на ожиданиях блокировки.
Давайте посмотрим как это работает с различной длиной порций данных (50 и 200):
Как видно, при более мелких порциях происходит больше ожидания на блокировках, но вклад этих ожиданий несущественен для общего результата и в целом с более короткой порцией отрабатывает на 1-2 секунду меньше за счет более эффективного распределения по ядрам. Левый график для порции 50, правый - 200:
Желающие могут самостоятельно убедиться с помощью счётчиков производительности, что всплеска дисковых операций не происходит - весь интенсивный обмен происходит в кэше - оперативной памяти сервера. Естественно, при прикладной разработке нужно подбирать длину порции таким, чтобы время его обработки существенно превышало время обмена данными (жёлтый блок).