Многопоточность в Java. Основы
от aNNiMON
В этой статье я затрону большую и сложную тему многопоточности в Java. Разумеется, обо всём в одной статье я не смогу рассказать, поэтому затрону лишь самые основные темы.
Существуют понятия процесс и поток. Процессу выделяется память, какие-то ресурсы, а также он содержит по крайней мере один поток. У потока же есть набор инструкций для выполнения кода.
Поскольку у одного процесса может быть несколько потоков, а ресурсы у процесса одни, то все эти потоки будут работать с этими одними ресурсами, причём почти одновременно. Здесь и кроется вся сложность создания многопоточных приложений.
Преимущества многопоточности
Не смотря на множество проблем, многопоточность очень полезна в пользовательских интерфейсах. Пока один поток занимается обработкой данных, второй преспокойно отрисовывает графику. В результате мы видим на экране плавную анимацию и можем взаимодействовать с элементами интерфейса, не опасаясь, что всё зависнет.
Также многопоточность позволяет улучшить скорость обработки данных: пока один поток подготавливает данные, например, выкачивая их из интернета мелкими порциями, второй и третий потоки могут их обрабатывать, а четвёртый записывать результат на диск.
Если же у нас многоядерный процессор, то потоки позволят существенно улучшить производительность, переложив работу на другие ядра.
Что ж, достаточно теории, перейдём к практике.
Создание потоков
В Java для работы с потоками служит класс Thread. Есть два способа создания нового потока, но их можно записать куда большим числом способов.
Первый вариант, это расширение класса Thread.
Второй - создание класса, реализующего интерфейс Runnable и передачу его экземпляра в конструктор класса Thread.
Как видно, во втором случае приходится чуть больше писать, но на самом деле преимущество всё-таки у этого способа, потому что мы можем и расширить класс, и реализовать интерфейс Runnable (например, class CustomRunnable extends JPanel implements Runnable). Первый же способ удобен в том случае, если надо переопределить или добавить новые методы в класс Thread.
Если новый класс для потока создавать накладно, то можно сделать всё в анонимном классе.
Точно так же и для Runnable
Эти два способа хорошо подходят для случаев, когда кода в методе run очень мало.
В Java 8 появились лямбда-выражения, а вместе с ними и ещё более короткие способы создания потока:
Или можно воспользоваться ссылкой на метод. Пожалуй, это самый компактный способ.
Можно ещё побаловаться с Reflection API, но это уж как-нибудь сами.
Прерывание потоков
Да-да, именно прерывание, а не остановка или пауза. Потоки нельзя остановить стандартными средствами, потому что с этим связано множество проблем. Поэтому методы stop, suspend, pause и destroy в классе Thread помечены как deprecated. Так что остановку приходится делать самому.
Вообще, поток в Java работает до тех пор, пока выполняется код в его методе run. Если в run будет простая операция, скажем, вывод текста в консоль, то поток сразу же завершится после этой операции. Если же мы в потоке записываем данные в цикле в 100 файлов, то поток будет работать до тех пор, пока не выполнит свою задачу.
И, к слову, если поток работает в фоне и вы закрываете программу, то процесс этой программы всё равно останется висеть в фоне, пока не завершатся все потоки. Чтобы по окончанию программы гарантированно получить завершение потоков, нужно сделать из потока демона . Звучит забавно, но так и есть:
Итак, у нас есть поток, который выполняет в цикле какую-то повторяющуюся операцию и нам нужно в некоторый момент времени его завершить. Самый очевидный способ, это завести логическую переменную и проверять её состояние. Чаще всего делают так:
Этот вариант имеет право на жизнь, но тогда поток нужно останавливать, вызывая метод stop. Во-первых, это не всегда удобно, а во-вторых, нужно лишнее поле в классе. Поэтому можно воспользоваться стандартными средствами, которые предоставляет класс Thread. Речь о флаге interrupted, который как раз и может послужить заменой нашему isRunning.
Для остановки потока теперь нам достаточно вызвать thread.interrupt(). В первом же случае, вызов этого метода не приведёт к желаемому результату.
Второй пример:
Потокам можно давать имена:
Вы уже заметили, что во всех блоках catch я пишу код Thread.currentThread().interrupt();? Это хорошая практика. Если во время паузы произошло исключение InterruptedException, то вы помечаете поток как interrupted и тем самым завершаете его выполнение (если использовали Thread.interrupted()). Если же оставлять блок catch пустым, то ничего не пометится. В некоторых случаях это может быть оправдано, но в остальном, рекомендую писать так.
С учётом исключения InterruptedException, можно переписать первый пример:
При исключении мы покинем цикл while и поток завершится. Оба метода runnable.stop() и thread.interrupt() теперь успешно завершают поток. Обратите внимание на ключевое слово volatile для поля isRunning, я объясню его значение чуть позже.
Проблема доступа к общим ресурсам
Как я говорил ранее, при работе с потоками может возникнуть множество проблем, поэтому важно уметь проектировать приложение так, чтобы этих проблем избежать. К счастью, в Java есть огромное множество решений, с которыми я вас постепенно буду знакомить.
Первая проблема - доступ к общим ресурсам из нескольких потоков. Допустим, есть некоторый счётчик:
И есть два потока, которые увеличивают его значение до тех пор, пока оно не станет равным 5000.
Очевидно, мы должны получить значение 5000, но не тут то было:
Запускаем ещё раз:
Почему так?
Тут вступает проблема видимости объекта.
Дело в том, что, изменив поле одним потоком, второй не сразу может увидеть это изменение или увидит его в другом порядке. Это связано с другим понятием атомарность.
Атомарной называется такая операция, которая может быть выполнена за один неделимый шаг.
Например, атомарной операцией будет присвоение числу int некоторой константы. А неатомарной - сумма двух чисел.
Чтобы пояснить, я окунусь глубже в JVM.
Некоторые операции в Java состоят из нескольких инструкций байт-кода:
Подвох вот в чём. В первом случае изменение поля происходит за одну инструкцию putstatic. Во втором, между getstatic и putstatic аж две инструкции. Ничто не мешает другому потоку, пока выполняется сложение, перезаписать значение поля counter.
Детальнее обо всём этом можно прочесть в статье: Модель памяти в примерах и не только / Хабрахабр
На 32-битных системах, присвоение значения типу long или double тоже будет неатомарной операцией. Но объявив поле с ключевым словом volatile, JVM будет гарантировать атомарность и видимость всем простым операциям, а именно чтению и записи поля. Вот только в нашем случае со счётчиком это не поможет - операция инкремента неатомарна.
Нам же нужен способ как-то блокировать другие потоки на время выполнения неатомарных операций. И этот способ есть - синхронизация.
В блоке синхронизации может находиться только один поток, остальные должны ожидать, когда этот поток покинет блок.
Для объявления блоков синхронизации в Java служит ключевое слово synchronized:
В качестве lockObject может выступать final поле или класс:
Теперь, чтобы решить нашу проблему со счётчиком, достаточно добавить неатомарную операцию в блок синхронизации.
На самом деле в этом коде тоже есть проблема, попробуйте найти её после прочтения раздела. Или читайте комментарии к статье.
Как это будет работать:
- Мы создали два потока и запустили их.
- Допустим, первый поток запустился быстрее и вошёл в цикл while. Второй пока запускается.
- Первый поток видит блок synchronized. Выполняется проверка - нет ли сейчас в этом блоке других потоков? Нет, поэтому первый поток заходит в блок. Второй пока что вошёл в цикл while.
- Первый поток сейчас в цикле for увеличивает счётчик. Второй поток доходит до блока synchronized. Снова выполняется проверка и поскольку поток внутри есть, разрешение войти внутрь не получено, а значит второй поток ждёт.
- Первый поток всё ещё в цикле for. Второй поток всё так же ждёт.
- Наконец, первый поток выходит из цикла for и покидает область синхронизации. Второй поток получает разрешение войти внутрь.
Таким образом, получается синхронизированная работа потоков.
Блоки синхронизации следует расставлять с умом. Если бы мы сделали вот так:
мы бы тоже получили верный результат 5000, вот только работал бы у нас только один поток:
- Создаём два потока и запускаем их.
- Допустим, первый поток запустился быстрее и вошёл в блок синхронизации. Второй пока запускается.
- Первый поток теперь в цикле while. Второй поток встретил блок synchonized и не получил разрешение войти.
- Первый поток работает. Второй ждёт.
- Спустя некоторое количество времени, первый поток увеличил счётчик до 5000 и вышел из циклов и блока синхронизации. Второму потоку разрешается войти внутрь.
- Первый поток завершил работу. Второй поток проверил, что условие Counter.get() < 5000 уже не выполняется и не вошёл в цикл while. Покинул блок синхронизации и завершился.
Другой вариант решения проблемы со счётчиком - сделать его методы get и increment синхронизированными. Тогда блок синхронизации в методе run не понадобится.
Это почти эквивалентно коду:
Отличия кроются в байткоде.
В первом случае метод помечается флагом SYNCHRONIZED
Во втором, добавляются дополнительные инструкции:
Что можно представить так:
То есть мы заходим в блок синхронизации, монитор. Возвращаем значение и выходим из монитора. Если во время критической секции произошло исключение, мы также выходим из монитора.
В байткоде synchronized метода инструкций monitorenter и monitorexit не было, но это не значит, что нет входа в монитор. Флаг SYNCHRONIZED у метода говорит JVM о том, что все эти инструкции нужно выполнить. То есть, они не появляются в коде, но сокрыты в JVM - она всё равно их выполнит.
Забегая вперёд, продемонстрирую ещё одно возможное решение проблемы. В пакете java.util.concurrent есть множество классов для различных многопоточных нужд. Одним из таких классов является AtomicInteger, который делает операции над числами атомарными.
Теперь нигде не нужно добавлять блок синхронизации.
О других классах, упрощающих работу с многопоточностью, я постараюсь рассказать в следующей статье.
Синхронизация. Пример проектирования многопоточного приложения
В довершение статьи, хочу показать пример небольшого приложения и важность правильного проектирования потоков.
Допустим, у нас есть 20 файлов с данными. Нам нужно наиболее эффективно прочитать эти файлы и вывести данные на экран. Данные, по мере чтения, будут добавляться в список и уже оттуда выводиться на экран.
Есть класс, отвечающий за панель рисования, туда мы будем добавлять объекты по мере чтения:
▌ Вариант 1. Без многопоточности
Казалось бы, нет потоков - нет проблем - будет работать медленно, но зато стабильно. Не тут-то было. При работе иногда выскакивает ошибка:
Ошибка ConcurrentModificationException появляется в том случае, когда предпринимается попытка доступа к списку из разных потоков.
Но погодите-ка, мы ведь не создавали потоков? Мы - нет, а вот для отрисовки интерфейса служит другой поток.
Так что, как ни крути, а доступ к списку нужно обернуть в блок синхронизации:
Теперь ошибок нет, но обработка 20 файлов занимает порядка пяти минут. Причём, первые файлы читаются быстро, а потом работа замедляется. Попробуйте понять, почему так происходит.
▌ Вариант 2. Один файл - один поток
Имея 20 потоков логично предположить, что работа пройдёт в 20 раз быстрее. Так бы оно и было, имей наш процессор хотя бы 20 ядер. В противном случае мы лишь создадим дополнительную нагрузку и все наши данные могут отрисовываться не очень плавно.
Тем не менее, время работы теперь: 40 секунд. Это долго. Та же проблема, что замедляла работу в первом варианте, замедляет всё и сейчас. Должен быть способ избавиться от synchronized блоков и такой способ есть.
▌ Вариант 3. Использование синхронизированного списка
Для того, чтобы сделать из обычного списка синхронизированный, вызываем метод
Теперь мы получим список, у которого все методы будут synchronized. Но, к сожалению, итератор, который используется в foreach таким образом не получится синхронизировать и нам придётся либо оборачивать его в synchronized блок, либо отказаться от него в пользу простого перебора элементов в цикле for.
Время работы теперь чуть менее 4 секунд!
▌ Вариант 4. Ограничение количества потоков
Мы можем ограничить количество потоков, создав пул:
Теперь можно создать хоть сотню потоков, выполняться в один момент времени будут не более maxThreads потоков.
Для добавления в пул существует метод execute(Runnable r). Пул потоков нужно завершать методом shutdown или shutdownNow. Метод awaitTermination(long timeout, TimeUnit unit) ждёт завершения потоков либо указанное в параметрах время, если к этому моменту есть работающие задачи.
Подробнее об Executor и ExecutorService я постараюсь рассказать в следующей статье.
Ограничим пул пятью потоками
Теперь время исполнения увеличилось до 11 секунд, но зато отрисовка проходит плавнее и мы можем быть уверены, что будь у нас сотня файлов, система не получит большую нагрузку.
Лучшей практикой, является ограничение пула потоков по количеству процессоров в системе:
▌ Вариант 5. java.util.concurrent
И снова я забегу вперёд и возьму что-нибудь из пакета java.util.concurrent. В этом пакете есть класс CopyOnWriteArrayList. Он настолько суров, что блоки синхронизации ему не нужны и можно беспрепятственно перебирать элементы в foreach.
Особых изменений в скорости работы при нескольких потоках мы не получим, но если запустить всё в одном потоке, как в первом случае, то вместо пяти минут мы получим 55 секунд!
Как можно понять из названия, его магия в том, что на каждую операцию добавления в список, создаётся новый массив. Это не очень хорошо в плане памяти, зато потокобезопасно. Подробнее разберёмся в следующей статье.
Наглядная работа всех вариантов представлена в этой анимации:
Версия побольше
Выводы
* Для работы с потоками в Java есть класс Thread.
* Поток можно создать расширив класс Thread или передав класс, реализовывающий интерфейс Runnable в конструктор класса Thread.
* Остановить поток в Java стандартными средствами нельзя.
* Для остановки потоков можно воспользоваться встроенным механизмом interrupt()/interrupted()
* При выходе из программы, JVM ожидает завершения всех потоков, кроме потоков-демонов.
* Потокам можно задавать имена в конструкторе или с помощью метода setName.
* Thread.sleep(100) приостановит текущий поток на 100 миллисекунд.
* thread.join() ждёт завершения потока.
* Необходимо синхронизировать работу потоков над общими ресурсами при помощи блоков синхронизации.
* synchronized метод и synchronized блок порождают разный байт-код на с точки зрения исполнения кода JVM это одно и то же.
* Ключевое слово volatile обеспечивает видимость и атомарность при чтении и записи полей для всех потоков.
* В пакете java.util.concurrent есть множество полезных классов для работы с многопоточностью.
* synchronized блоки нужно расставлять с умом, иначе получим потерю производительности.
* ArrayList потокоНЕбезопасен. Для создания потокобезопасной копии, есть метод Collections.synchronizedList.
* Для ограничения количества потоков существуют пулы потоков.
* Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ) создаст пул потоков с числом ядер процессора, что является эффективным решением.
* Даже если вы не создаёте потоков, это не значит, что их нет в программе.
Исходный код: GitHub
Содержание
Существуют понятия процесс и поток. Процессу выделяется память, какие-то ресурсы, а также он содержит по крайней мере один поток. У потока же есть набор инструкций для выполнения кода.
Поскольку у одного процесса может быть несколько потоков, а ресурсы у процесса одни, то все эти потоки будут работать с этими одними ресурсами, причём почти одновременно. Здесь и кроется вся сложность создания многопоточных приложений.
Преимущества многопоточности
Не смотря на множество проблем, многопоточность очень полезна в пользовательских интерфейсах. Пока один поток занимается обработкой данных, второй преспокойно отрисовывает графику. В результате мы видим на экране плавную анимацию и можем взаимодействовать с элементами интерфейса, не опасаясь, что всё зависнет.
Также многопоточность позволяет улучшить скорость обработки данных: пока один поток подготавливает данные, например, выкачивая их из интернета мелкими порциями, второй и третий потоки могут их обрабатывать, а четвёртый записывать результат на диск.
Если же у нас многоядерный процессор, то потоки позволят существенно улучшить производительность, переложив работу на другие ядра.
Что ж, достаточно теории, перейдём к практике.
Создание потоков
В Java для работы с потоками служит класс Thread. Есть два способа создания нового потока, но их можно записать куда большим числом способов.
Первый вариант, это расширение класса Thread.
- public class CustomThread extends Thread {
- @Override
- public void run() {
- System.out.println("class CustomThread extends Thread { }");
- try {
- sleep(100); // пауза в 100 мс
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- System.out.println(1);
- }
- }
- // ...
- CustomThread thread1 = new CustomThread();
- thread1.start();
Второй - создание класса, реализующего интерфейс Runnable и передачу его экземпляра в конструктор класса Thread.
- public class CustomRunnable implements Runnable {
- @Override
- public void run() {
- System.out.println("class CustomRunnable implements Runnable { }");
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- System.out.println(2);
- }
- }
- // ...
- Thread thread2 = new Thread(new CustomRunnable());
- thread2.start();
Как видно, во втором случае приходится чуть больше писать, но на самом деле преимущество всё-таки у этого способа, потому что мы можем и расширить класс, и реализовать интерфейс Runnable (например, class CustomRunnable extends JPanel implements Runnable). Первый же способ удобен в том случае, если надо переопределить или добавить новые методы в класс Thread.
Если новый класс для потока создавать накладно, то можно сделать всё в анонимном классе.
- Thread thread3 = new Thread() {
- @Override
- public void run() {
- System.out.println("new Thread() { }");
- try {
- sleep(100);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- System.out.println(3);
- }
- };
- thread3.start();
Точно так же и для Runnable
- Thread thread4 = new Thread(new Runnable() {
- @Override
- public void run() {
- System.out.println("new Thread(new Runnable() { })");
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- System.out.println(4);
- }
- });
- thread4.start();
Эти два способа хорошо подходят для случаев, когда кода в методе run очень мало.
В Java 8 появились лямбда-выражения, а вместе с ними и ещё более короткие способы создания потока:
- Thread thread5 = new Thread(() -> {
- System.out.println("new Thread(() -> { })");
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- System.out.println(5);
- });
- thread5.start();
Или можно воспользоваться ссылкой на метод. Пожалуй, это самый компактный способ.
- Thread thread6 = new Thread(Main::threadMethod);
- thread6.start();
- // ...
- private static void threadMethod() {
- System.out.println("new Thread(Main::threadMethod)");
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- System.out.println(6);
- }
Можно ещё побаловаться с Reflection API, но это уж как-нибудь сами.
Прерывание потоков
Да-да, именно прерывание, а не остановка или пауза. Потоки нельзя остановить стандартными средствами, потому что с этим связано множество проблем. Поэтому методы stop, suspend, pause и destroy в классе Thread помечены как deprecated. Так что остановку приходится делать самому.
Вообще, поток в Java работает до тех пор, пока выполняется код в его методе run. Если в run будет простая операция, скажем, вывод текста в консоль, то поток сразу же завершится после этой операции. Если же мы в потоке записываем данные в цикле в 100 файлов, то поток будет работать до тех пор, пока не выполнит свою задачу.
И, к слову, если поток работает в фоне и вы закрываете программу, то процесс этой программы всё равно останется висеть в фоне, пока не завершатся все потоки. Чтобы по окончанию программы гарантированно получить завершение потоков, нужно сделать из потока демона . Звучит забавно, но так и есть:
- thread.setDaemon(true)
Итак, у нас есть поток, который выполняет в цикле какую-то повторяющуюся операцию и нам нужно в некоторый момент времени его завершить. Самый очевидный способ, это завести логическую переменную и проверять её состояние. Чаще всего делают так:
- private boolean isRunning;
- public void stop() {
- isRunning = false;
- }
- @Override
- public void run() {
- isRunning = true;
- String name = Thread.currentThread().getName();
- System.out.printf("Thread %s started\n", name);
- int counter = 0;
- while (isRunning) {
- System.out.printf("Thread %s. counter = %d\n", name, counter);
- counter++;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- System.out.printf("Thread %s interrupted\n", name);
- Thread.currentThread().interrupt();
- }
- }
- }
Этот вариант имеет право на жизнь, но тогда поток нужно останавливать, вызывая метод stop. Во-первых, это не всегда удобно, а во-вторых, нужно лишнее поле в классе. Поэтому можно воспользоваться стандартными средствами, которые предоставляет класс Thread. Речь о флаге interrupted, который как раз и может послужить заменой нашему isRunning.
- @Override
- public void run() {
- String name = Thread.currentThread().getName();
- System.out.printf("Thread %s started\n", name);
- int counter = 0;
- while (!Thread.interrupted()) {
- System.out.printf("Thread %s. counter = %d\n", name, counter);
- counter++;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ex) {
- System.out.printf("Thread %s interrupted\n", name);
- Thread.currentThread().interrupt();
- }
- }
- }
Для остановки потока теперь нам достаточно вызвать thread.interrupt(). В первом же случае, вызов этого метода не приведёт к желаемому результату.
- // первый пример
- thread1.start();
- Thread.sleep(500);
- thread1.interrupt(); // не остановит поток
- Thread.sleep(50);
- System.out.printf("Thread %s isAlive: %b\n", thread1.getName(), thread1.isAlive());
- runnable.stop();
- System.out.printf("Thread %s isAlive: %b\n", thread1.getName(), thread1.isAlive());
- // вывод
- Thread Thread-0 started
- Thread Thread-0. counter = 0
- Thread Thread-0. counter = 1
- Thread Thread-0. counter = 2
- Thread Thread-0. counter = 3
- Thread Thread-0. counter = 4
- Thread Thread-0 interrupted
- Thread Thread-0. counter = 5
- Thread Thread-0 interrupted
- Thread Thread-0. counter = 6
- Thread Thread-0 interrupted
- // ...
- Thread Thread-0. counter = 28
- Thread Thread-0 interrupted
- Thread Thread-0. counter = 29
- Thread Thread-0 isAlive: true
- Thread Thread-0 interrupted
- Thread Thread-0 finished
- Thread Thread-0 isAlive: false
Второй пример:
- // второй пример
- thread2.start();
- Thread.sleep(500);
- thread2.interrupt();
- Thread.sleep(50);
- System.out.printf("Thread %s isAlive: %b\n", thread2.getName(), thread2.isAlive());
- // вывод
- Thread second started
- Thread second. counter = 0
- Thread second. counter = 1
- Thread second. counter = 2
- Thread second. counter = 3
- Thread second. counter = 4
- Thread second interrupted
- Thread second finished
- Thread second isAlive: false
Потокам можно давать имена:
- Thread thread2 = new Thread(new CustomRunnable2(), "second");
- // или
- thread2.setName("second");
Вы уже заметили, что во всех блоках catch я пишу код Thread.currentThread().interrupt();? Это хорошая практика. Если во время паузы произошло исключение InterruptedException, то вы помечаете поток как interrupted и тем самым завершаете его выполнение (если использовали Thread.interrupted()). Если же оставлять блок catch пустым, то ничего не пометится. В некоторых случаях это может быть оправдано, но в остальном, рекомендую писать так.
С учётом исключения InterruptedException, можно переписать первый пример:
- private boolean volatile isRunning;
- public void stop() {
- isRunning = false;
- }
- @Override
- public void run() {
- isRunning = true;
- String name = Thread.currentThread().getName();
- System.out.printf("Thread %s started\n", name);
- int counter = 0;
- try {
- while (isRunning) {
- System.out.printf("Thread %s. counter = %d\n", name, counter);
- counter++;
- Thread.sleep(100);
- }
- } catch (InterruptedException ex) {
- System.out.printf("Thread %s interrupted\n", name);
- Thread.currentThread().interrupt();
- }
- }
При исключении мы покинем цикл while и поток завершится. Оба метода runnable.stop() и thread.interrupt() теперь успешно завершают поток. Обратите внимание на ключевое слово volatile для поля isRunning, я объясню его значение чуть позже.
- // третий пример
- thread3.start();
- Thread.sleep(500);
- // now both methods works fine
- thread3.interrupt();
- //runnable3.stop();
- Thread.sleep(50);
- System.out.printf("Thread %s isAlive: %b\n", thread3.getName(), thread3.isAlive());
- // вывод
- Thread Thread-1 started
- Thread Thread-1. counter = 0
- Thread Thread-1. counter = 1
- Thread Thread-1. counter = 2
- Thread Thread-1. counter = 3
- Thread Thread-1. counter = 4
- Thread Thread-1 interrupted
- Thread Thread-1 finished
- Thread Thread-1 isAlive: false
Проблема доступа к общим ресурсам
Как я говорил ранее, при работе с потоками может возникнуть множество проблем, поэтому важно уметь проектировать приложение так, чтобы этих проблем избежать. К счастью, в Java есть огромное множество решений, с которыми я вас постепенно буду знакомить.
Первая проблема - доступ к общим ресурсам из нескольких потоков. Допустим, есть некоторый счётчик:
- public class Counter {
- private static int counter = 0;
- public static int get() {
- return counter;
- }
- public static void increment() {
- counter++;
- }
- }
И есть два потока, которые увеличивают его значение до тех пор, пока оно не станет равным 5000.
- private static void printCounter() {
- while (Counter.get() < 5000) {
- for (int i = 0; i < 10; i++) {
- Counter.increment();
- }
- try {
- Thread.sleep(1);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- // ...
- Runnable runnable = CounterProblem::printCounter;
- Thread thread1 = new Thread(runnable);
- Thread thread2 = new Thread(runnable);
- thread1.start();
- thread2.start();
- // ожидание завершения потоков
- thread1.join();
- thread2.join();
- System.out.println("All threads finished. Counter = " + Counter.get());
Очевидно, мы должны получить значение 5000, но не тут то было:
- All threads finished. Counter = 5003
Запускаем ещё раз:
- All threads finished. Counter = 5009
Почему так?
Тут вступает проблема видимости объекта.
Дело в том, что, изменив поле одним потоком, второй не сразу может увидеть это изменение или увидит его в другом порядке. Это связано с другим понятием атомарность.
Атомарной называется такая операция, которая может быть выполнена за один неделимый шаг.
Например, атомарной операцией будет присвоение числу int некоторой константы. А неатомарной - сумма двух чисел.
Чтобы пояснить, я окунусь глубже в JVM.
Некоторые операции в Java состоят из нескольких инструкций байт-кода:
- private int counter;
- counter = 6;
- // bipush 6
- // putstatic Counter.counter
- counter += 6;
- // getstatic Counter.counter
- // bipush 6
- // iadd
- // putstatic Counter.counter
Подвох вот в чём. В первом случае изменение поля происходит за одну инструкцию putstatic. Во втором, между getstatic и putstatic аж две инструкции. Ничто не мешает другому потоку, пока выполняется сложение, перезаписать значение поля counter.
Детальнее обо всём этом можно прочесть в статье: Модель памяти в примерах и не только / Хабрахабр
На 32-битных системах, присвоение значения типу long или double тоже будет неатомарной операцией. Но объявив поле с ключевым словом volatile, JVM будет гарантировать атомарность и видимость всем простым операциям, а именно чтению и записи поля. Вот только в нашем случае со счётчиком это не поможет - операция инкремента неатомарна.
Нам же нужен способ как-то блокировать другие потоки на время выполнения неатомарных операций. И этот способ есть - синхронизация.
В блоке синхронизации может находиться только один поток, остальные должны ожидать, когда этот поток покинет блок.
Для объявления блоков синхронизации в Java служит ключевое слово synchronized:
- synchronized (lockObject) {
- // критическая секция
- }
В качестве lockObject может выступать final поле или класс:
- private final Object lock = new Object();
- synchronized (lock) { }
- synchronized(this) { }
- synchronized(Main.class) { }
Теперь, чтобы решить нашу проблему со счётчиком, достаточно добавить неатомарную операцию в блок синхронизации.
- private static void printCounter() {
- while (Counter.get() < 5000) {
- synchronized (Counter.class) {
- for (int i = 0; i < 10; i++) {
- Counter.increment();
- }
- }
- try {
- Thread.sleep(1);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- // Выводит
- All threads finished. Counter = 5000
На самом деле в этом коде тоже есть проблема, попробуйте найти её после прочтения раздела. Или читайте комментарии к статье.
Как это будет работать:
- Мы создали два потока и запустили их.
- Допустим, первый поток запустился быстрее и вошёл в цикл while. Второй пока запускается.
- Первый поток видит блок synchronized. Выполняется проверка - нет ли сейчас в этом блоке других потоков? Нет, поэтому первый поток заходит в блок. Второй пока что вошёл в цикл while.
- Первый поток сейчас в цикле for увеличивает счётчик. Второй поток доходит до блока synchronized. Снова выполняется проверка и поскольку поток внутри есть, разрешение войти внутрь не получено, а значит второй поток ждёт.
- Первый поток всё ещё в цикле for. Второй поток всё так же ждёт.
- Наконец, первый поток выходит из цикла for и покидает область синхронизации. Второй поток получает разрешение войти внутрь.
Таким образом, получается синхронизированная работа потоков.
Блоки синхронизации следует расставлять с умом. Если бы мы сделали вот так:
- private static void printCounter() {
- synchronized (Counter.class) {
- while (Counter.get() < 5000) {
- for (int i = 0; i < 10; i++) {
- Counter.increment();
- }
- try {
- Thread.sleep(1);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
мы бы тоже получили верный результат 5000, вот только работал бы у нас только один поток:
- Создаём два потока и запускаем их.
- Допустим, первый поток запустился быстрее и вошёл в блок синхронизации. Второй пока запускается.
- Первый поток теперь в цикле while. Второй поток встретил блок synchonized и не получил разрешение войти.
- Первый поток работает. Второй ждёт.
- Спустя некоторое количество времени, первый поток увеличил счётчик до 5000 и вышел из циклов и блока синхронизации. Второму потоку разрешается войти внутрь.
- Первый поток завершил работу. Второй поток проверил, что условие Counter.get() < 5000 уже не выполняется и не вошёл в цикл while. Покинул блок синхронизации и завершился.
Другой вариант решения проблемы со счётчиком - сделать его методы get и increment синхронизированными. Тогда блок синхронизации в методе run не понадобится.
- public class SynchronizedCounter {
- private static int counter = 0;
- public static synchronized int get() {
- return counter;
- }
- public static synchronized void increment() {
- counter++;
- }
- }
Это почти эквивалентно коду:
- public class SynchronizedCounter {
- private static int counter = 0;
- public static int get() {
- synchronized (SynchronizedCounter.class) {
- return counter;
- }
- }
- public static void increment() {
- synchronized (SynchronizedCounter.class) {
- counter++;
- }
- }
- }
Отличия кроются в байткоде.
В первом случае метод помечается флагом SYNCHRONIZED
- public static synchronized int get();
- Flags: PUBLIC, STATIC, SYNCHRONIZED
- Code:
- stack=1, locals=0, arguments=0
- linenumber 15
- 0: getstatic SynchronizedCounter.counter:I
- 3: ireturn
Во втором, добавляются дополнительные инструкции:
- public static int get();
- Flags: PUBLIC, STATIC
- Code:
- stack=2, locals=2, arguments=0
- linenumber 19
- 0: ldc LSynchronizedCounter;.class
- 2: dup
- 3: astore_0
- 4: monitorenter
- linenumber 20
- 5: getstatic SynchronizedCounter.counter:I
- 8: aload_0
- 9: monitorexit
- 10: ireturn
- linenumber 21
- 11: astore_1
- 12: aload_0
- 13: monitorexit
- 14: aload_1
- 15: athrow
- Exceptions:
- Try Handler
- Start End Start End Type
- ----- ----- ----- ----- ----
- 5 10 11 16 Any
- 11 14 11 16 Any
Что можно представить так:
- monitorenter (SynchronizedCounter.class)
- try {
- return counter
- } finally {
- monitorexit
- }
То есть мы заходим в блок синхронизации, монитор. Возвращаем значение и выходим из монитора. Если во время критической секции произошло исключение, мы также выходим из монитора.
В байткоде synchronized метода инструкций monitorenter и monitorexit не было, но это не значит, что нет входа в монитор. Флаг SYNCHRONIZED у метода говорит JVM о том, что все эти инструкции нужно выполнить. То есть, они не появляются в коде, но сокрыты в JVM - она всё равно их выполнит.
Забегая вперёд, продемонстрирую ещё одно возможное решение проблемы. В пакете java.util.concurrent есть множество классов для различных многопоточных нужд. Одним из таких классов является AtomicInteger, который делает операции над числами атомарными.
- public class AtomicCounter {
- private static final AtomicInteger counter = new AtomicInteger();
- public static int get() {
- return counter.get();
- }
- public static void increment() {
- counter.incrementAndGet();
- }
- }
Теперь нигде не нужно добавлять блок синхронизации.
О других классах, упрощающих работу с многопоточностью, я постараюсь рассказать в следующей статье.
Синхронизация. Пример проектирования многопоточного приложения
В довершение статьи, хочу показать пример небольшого приложения и важность правильного проектирования потоков.
Допустим, у нас есть 20 файлов с данными. Нам нужно наиболее эффективно прочитать эти файлы и вывести данные на экран. Данные, по мере чтения, будут добавляться в список и уже оттуда выводиться на экран.
Есть класс, отвечающий за панель рисования, туда мы будем добавлять объекты по мере чтения:
- private PaintPanel panel;
- private void processFile(File file) {
- System.out.printf("Process file %s in %s thread\n", file.getName(), Thread.currentThread().getName());
- try(FileInputStream fis = new FileInputStream(file);
- DataInputStream dis = new DataInputStream(fis)) {
- while (dis.available() > 0) {
- Triangle triangle = Triangle.read(dis);
- panel.addTriangle(triangle);
- Thread.sleep(1);
- }
- } catch (IOException | InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
▌ Вариант 1. Без многопоточности
- for (File file : files) {
- processFile(file);
- }
- public class PaintPanelWithoutSynchronized extends PaintPanel {
- private final List<Triangle> triangles;
- public PaintPanelWithoutSynchronized(int width, int height) {
- super(width, height);
- triangles = new ArrayList<>();
- }
- @Override
- public void addTriangle(Triangle triangle) {
- triangles.add(triangle);
- SwingUtilities.invokeLater(this::repaint);
- }
- @Override
- protected void paintComponent(Graphics g) {
- super.paintComponent(g);
- for (Triangle triangle : triangles) {
- triangle.draw(g);
- }
- }
- }
Казалось бы, нет потоков - нет проблем - будет работать медленно, но зато стабильно. Не тут-то было. При работе иногда выскакивает ошибка:
- Exception in thread "AWT-EventQueue-0" java.util.ConcurrentModificationException
- at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
- at java.util.ArrayList$Itr.next(ArrayList.java:851)
- at concurrency.synchronization.PaintPanelWithoutSynchronized.paintComponent(PaintPanelWithoutSynchronized.java:27)
Ошибка ConcurrentModificationException появляется в том случае, когда предпринимается попытка доступа к списку из разных потоков.
Но погодите-ка, мы ведь не создавали потоков? Мы - нет, а вот для отрисовки интерфейса служит другой поток.
- addTriangle: main thread
- paintComponent: AWT-EventQueue-0 thread
Так что, как ни крути, а доступ к списку нужно обернуть в блок синхронизации:
- public class PaintPanelWithSynchronized extends PaintPanel {
- private final List<Triangle> triangles;
- public PaintPanelWithSynchronized(int width, int height) {
- super(width, height);
- triangles = new ArrayList<>();
- }
- @Override
- public void addTriangle(Triangle triangle) {
- synchronized (triangles) {
- triangles.add(triangle);
- }
- SwingUtilities.invokeLater(this::repaint);
- }
- @Override
- protected void paintComponent(Graphics g) {
- super.paintComponent(g);
- synchronized (triangles) {
- for (Triangle triangle : triangles) {
- triangle.draw(g);
- }
- }
- }
- }
Теперь ошибок нет, но обработка 20 файлов занимает порядка пяти минут. Причём, первые файлы читаются быстро, а потом работа замедляется. Попробуйте понять, почему так происходит.
▌ Вариант 2. Один файл - один поток
Имея 20 потоков логично предположить, что работа пройдёт в 20 раз быстрее. Так бы оно и было, имей наш процессор хотя бы 20 ядер. В противном случае мы лишь создадим дополнительную нагрузку и все наши данные могут отрисовываться не очень плавно.
- List<Thread> threads = new ArrayList<>(files.length);
- for (File file : files) {
- Thread thread = new Thread(() -> processFile(file));
- threads.add(thread);
- thread.start();
- }
- try {
- for (Thread thread : threads) {
- thread.join();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
Тем не менее, время работы теперь: 40 секунд. Это долго. Та же проблема, что замедляла работу в первом варианте, замедляет всё и сейчас. Должен быть способ избавиться от synchronized блоков и такой способ есть.
▌ Вариант 3. Использование синхронизированного списка
Для того, чтобы сделать из обычного списка синхронизированный, вызываем метод
- Collections.synchronizedList(list)
Теперь мы получим список, у которого все методы будут synchronized. Но, к сожалению, итератор, который используется в foreach таким образом не получится синхронизировать и нам придётся либо оборачивать его в synchronized блок, либо отказаться от него в пользу простого перебора элементов в цикле for.
- public class PaintPanelSynchronizedList extends PaintPanel {
- private final List<Triangle> triangles;
- public PaintPanelSynchronizedList(int width, int height) {
- super(width, height);
- triangles = Collections.synchronizedList(new ArrayList<>());
- }
- @Override
- public void addTriangle(Triangle triangle) {
- triangles.add(triangle);
- SwingUtilities.invokeLater(this::repaint);
- }
- @Override
- protected void paintComponent(Graphics g) {
- super.paintComponent(g);
- for (int i = 0; i < triangles.size(); i++) {
- triangles.get(i).draw(g);
- }
- }
- }
Время работы теперь чуть менее 4 секунд!
▌ Вариант 4. Ограничение количества потоков
Мы можем ограничить количество потоков, создав пул:
- ExecutorService es = Executors.newFixedThreadPool(maxThreads);
Теперь можно создать хоть сотню потоков, выполняться в один момент времени будут не более maxThreads потоков.
Для добавления в пул существует метод execute(Runnable r). Пул потоков нужно завершать методом shutdown или shutdownNow. Метод awaitTermination(long timeout, TimeUnit unit) ждёт завершения потоков либо указанное в параметрах время, если к этому моменту есть работающие задачи.
Подробнее об Executor и ExecutorService я постараюсь рассказать в следующей статье.
Ограничим пул пятью потоками
- ExecutorService es = Executors.newFixedThreadPool(5);
- for (File file : files) {
- es.execute( () -> processFile(file) );
- }
- es.shutdown();
- try {
- es.awaitTermination(2, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
Теперь время исполнения увеличилось до 11 секунд, но зато отрисовка проходит плавнее и мы можем быть уверены, что будь у нас сотня файлов, система не получит большую нагрузку.
Лучшей практикой, является ограничение пула потоков по количеству процессоров в системе:
- Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );
▌ Вариант 5. java.util.concurrent
И снова я забегу вперёд и возьму что-нибудь из пакета java.util.concurrent. В этом пакете есть класс CopyOnWriteArrayList. Он настолько суров, что блоки синхронизации ему не нужны и можно беспрепятственно перебирать элементы в foreach.
- import java.util.concurrent.CopyOnWriteArrayList;
- public class PaintPanelCopyOnWriteArrayList extends PaintPanel {
- private final List<Triangle> triangles;
- public PaintPanelCopyOnWriteArrayList(int width, int height) {
- super(width, height);
- triangles = new CopyOnWriteArrayList<>();
- }
- @Override
- public void addTriangle(Triangle triangle) {
- triangles.add(triangle);
- SwingUtilities.invokeLater(this::repaint);
- }
- @Override
- protected void paintComponent(Graphics g) {
- super.paintComponent(g);
- for (Triangle triangle : triangles) {
- triangle.draw(g);
- }
- }
- }
Особых изменений в скорости работы при нескольких потоках мы не получим, но если запустить всё в одном потоке, как в первом случае, то вместо пяти минут мы получим 55 секунд!
- Process file 0.dat in main thread
- Process file 1.dat in main thread
- Process file 10.dat in main thread
- Process file 11.dat in main thread
- Process file 12.dat in main thread
- Process file 13.dat in main thread
- Process file 14.dat in main thread
- Process file 15.dat in main thread
- Process file 16.dat in main thread
- Process file 17.dat in main thread
- Process file 18.dat in main thread
- Process file 19.dat in main thread
- Process file 2.dat in main thread
- Process file 3.dat in main thread
- Process file 4.dat in main thread
- Process file 5.dat in main thread
- Process file 6.dat in main thread
- Process file 7.dat in main thread
- Process file 8.dat in main thread
- Process file 9.dat in main thread
- Elapsed time: 54942 ms
Как можно понять из названия, его магия в том, что на каждую операцию добавления в список, создаётся новый массив. Это не очень хорошо в плане памяти, зато потокобезопасно. Подробнее разберёмся в следующей статье.
Наглядная работа всех вариантов представлена в этой анимации:
Версия побольше
Выводы
* Для работы с потоками в Java есть класс Thread.
* Поток можно создать расширив класс Thread или передав класс, реализовывающий интерфейс Runnable в конструктор класса Thread.
* Остановить поток в Java стандартными средствами нельзя.
* Для остановки потоков можно воспользоваться встроенным механизмом interrupt()/interrupted()
* При выходе из программы, JVM ожидает завершения всех потоков, кроме потоков-демонов.
* Потокам можно задавать имена в конструкторе или с помощью метода setName.
* Thread.sleep(100) приостановит текущий поток на 100 миллисекунд.
* thread.join() ждёт завершения потока.
* Необходимо синхронизировать работу потоков над общими ресурсами при помощи блоков синхронизации.
* synchronized метод и synchronized блок порождают разный байт-код на с точки зрения исполнения кода JVM это одно и то же.
* Ключевое слово volatile обеспечивает видимость и атомарность при чтении и записи полей для всех потоков.
* В пакете java.util.concurrent есть множество полезных классов для работы с многопоточностью.
* synchronized блоки нужно расставлять с умом, иначе получим потерю производительности.
* ArrayList потокоНЕбезопасен. Для создания потокобезопасной копии, есть метод Collections.synchronizedList.
* Для ограничения количества потоков существуют пулы потоков.
* Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ) создаст пул потоков с числом ядер процессора, что является эффективным решением.
* Даже если вы не создаёте потоков, это не значит, что их нет в программе.
Исходный код: GitHub