Многопоточность в Java. Основы

Если вы заметили ошибку, нажмите Shift+Enter, чтобы сообщить о ней. Все ваши сообщения можно посмотреть здесь.
27.11.2015 / 22:49 от aNNiMON
Java

В этой статье я затрону большую и сложную тему многопоточности в Java. Разумеется, обо всём в одной статье я не смогу рассказать, поэтому затрону лишь самые основные темы.

Существуют понятия процесс и поток. Процессу выделяется память, какие-то ресурсы, а также он содержит по крайней мере один поток. У потока же есть набор инструкций для выполнения кода.
Поскольку у одного процесса может быть несколько потоков, а ресурсы у процесса одни, то все эти потоки будут работать с этими одними ресурсами, причём почти одновременно. Здесь и кроется вся сложность создания многопоточных приложений.


Преимущества многопоточности
Не смотря на множество проблем, многопоточность очень полезна в пользовательских интерфейсах. Пока один поток занимается обработкой данных, второй преспокойно отрисовывает графику. В результате мы видим на экране плавную анимацию и можем взаимодействовать с элементами интерфейса, не опасаясь, что всё зависнет.

Также многопоточность позволяет улучшить скорость обработки данных: пока один поток подготавливает данные, например, выкачивая их из интернета мелкими порциями, второй и третий потоки могут их обрабатывать, а четвёртый записывать результат на диск.

Если же у нас многоядерный процессор, то потоки позволят существенно улучшить производительность, переложив работу на другие ядра.


Что ж, достаточно теории, перейдём к практике.


Создание потоков
В Java для работы с потоками служит класс Thread. Есть два способа создания нового потока, но их можно записать куда большим числом способов.

Первый вариант, это расширение класса Thread.
  1. public class CustomThread extends Thread {
  2.  
  3.     @Override
  4.     public void run() {
  5.         System.out.println("class CustomThread extends Thread { }");
  6.         try {
  7.             sleep(100); // пауза в 100 мс
  8.         } catch (InterruptedException ex) {
  9.             Thread.currentThread().interrupt();
  10.         }
  11.         System.out.println(1);
  12.     }
  13. }
  14.  
  15. // ...
  16. CustomThread thread1 = new CustomThread();
  17. thread1.start();

Второй - создание класса, реализующего интерфейс Runnable и передачу его экземпляра в конструктор класса Thread.
  1. public class CustomRunnable implements Runnable {
  2.  
  3.     @Override
  4.     public void run() {
  5.         System.out.println("class CustomRunnable implements Runnable { }");
  6.         try {
  7.             Thread.sleep(100);
  8.         } catch (InterruptedException ex) {
  9.             Thread.currentThread().interrupt();
  10.         }
  11.         System.out.println(2);
  12.     }
  13. }
  14.  
  15. // ...
  16. Thread thread2 = new Thread(new CustomRunnable());
  17. thread2.start();

Как видно, во втором случае приходится чуть больше писать, но на самом деле преимущество всё-таки у этого способа, потому что мы можем и расширить класс, и реализовать интерфейс Runnable (например, class CustomRunnable extends JPanel implements Runnable). Первый же способ удобен в том случае, если надо переопределить или добавить новые методы в класс Thread.

Если новый класс для потока создавать накладно, то можно сделать всё в анонимном классе.
  1. Thread thread3 = new Thread() {
  2.     @Override
  3.     public void run() {
  4.         System.out.println("new Thread() { }");
  5.         try {
  6.             sleep(100);
  7.         } catch (InterruptedException ex) {
  8.             Thread.currentThread().interrupt();
  9.         }
  10.         System.out.println(3);
  11.     }
  12. };
  13. thread3.start();

Точно так же и для Runnable
  1. Thread thread4 = new Thread(new Runnable() {
  2.     @Override
  3.     public void run() {
  4.         System.out.println("new Thread(new Runnable() { })");
  5.         try {
  6.             Thread.sleep(100);
  7.         } catch (InterruptedException ex) {
  8.             Thread.currentThread().interrupt();
  9.         }
  10.         System.out.println(4);
  11.     }
  12. });
  13. thread4.start();

Эти два способа хорошо подходят для случаев, когда кода в методе run очень мало.

В Java 8 появились лямбда-выражения, а вместе с ними и ещё более короткие способы создания потока:
  1. Thread thread5 = new Thread(() -> {
  2.     System.out.println("new Thread(() -> { })");
  3.     try {
  4.         Thread.sleep(100);
  5.     } catch (InterruptedException ex) {
  6.         Thread.currentThread().interrupt();
  7.     }
  8.     System.out.println(5);
  9. });
  10. thread5.start();

Или можно воспользоваться ссылкой на метод. Пожалуй, это самый компактный способ.
  1. Thread thread6 = new Thread(Main::threadMethod);
  2. thread6.start();
  3.  
  4. // ...
  5. private static void threadMethod() {
  6.     System.out.println("new Thread(Main::threadMethod)");
  7.     try {
  8.         Thread.sleep(100);
  9.     } catch (InterruptedException ex) {
  10.         Thread.currentThread().interrupt();
  11.     }
  12.     System.out.println(6);
  13. }

Можно ещё побаловаться с Reflection API, но это уж как-нибудь сами.


Прерывание потоков
Да-да, именно прерывание, а не остановка или пауза. Потоки нельзя остановить стандартными средствами, потому что с этим связано множество проблем. Поэтому методы stop, suspend, pause и destroy в классе Thread помечены как deprecated. Так что остановку приходится делать самому.

Вообще, поток в Java работает до тех пор, пока выполняется код в его методе run. Если в run будет простая операция, скажем, вывод текста в консоль, то поток сразу же завершится после этой операции. Если же мы в потоке записываем данные в цикле в 100 файлов, то поток будет работать до тех пор, пока не выполнит свою задачу.

И, к слову, если поток работает в фоне и вы закрываете программу, то процесс этой программы всё равно останется висеть в фоне, пока не завершатся все потоки. Чтобы по окончанию программы гарантированно получить завершение потоков, нужно сделать из потока демона :webdemon:. Звучит забавно, но так и есть:
  1. thread.setDaemon(true)

Итак, у нас есть поток, который выполняет в цикле какую-то повторяющуюся операцию и нам нужно в некоторый момент времени его завершить. Самый очевидный способ, это завести логическую переменную и проверять её состояние. Чаще всего делают так:
  1. private boolean isRunning;
  2.  
  3. public void stop() {
  4.     isRunning = false;
  5. }
  6.  
  7. @Override
  8. public void run() {
  9.     isRunning = true;
  10.     String name = Thread.currentThread().getName();
  11.     System.out.printf("Thread %s started\n", name);
  12.  
  13.     int counter = 0;
  14.     while (isRunning) {
  15.         System.out.printf("Thread %s. counter = %d\n", name, counter);
  16.         counter++;
  17.  
  18.         try {
  19.             Thread.sleep(100);
  20.         } catch (InterruptedException ex) {
  21.             System.out.printf("Thread %s interrupted\n", name);
  22.             Thread.currentThread().interrupt();
  23.         }
  24.     }
  25. }

Этот вариант имеет право на жизнь, но тогда поток нужно останавливать, вызывая метод stop. Во-первых, это не всегда удобным, а во-вторых, нужно лишнее поле в классе. Поэтому можно воспользоваться стандартными средствами, которые предоставляет класс Thread. Речь о флаге interrupted, который как раз и может послужить заменой нашему isRunning.
  1. @Override
  2. public void run() {
  3.     String name = Thread.currentThread().getName();
  4.     System.out.printf("Thread %s started\n", name);
  5.  
  6.     int counter = 0;
  7.     while (!Thread.interrupted()) {
  8.         System.out.printf("Thread %s. counter = %d\n", name, counter);
  9.         counter++;
  10.         try {
  11.             Thread.sleep(100);
  12.         } catch (InterruptedException ex) {
  13.             System.out.printf("Thread %s interrupted\n", name);
  14.             Thread.currentThread().interrupt();
  15.         }
  16.     }
  17. }

Для остановки потока теперь нам достаточно вызвать thread.interrupt(). В первом же случае, вызов этого метода не приведёт к желаемому результату.
  1. // первый пример
  2. thread1.start();
  3. Thread.sleep(500);
  4. thread1.interrupt(); // не остановит поток
  5. Thread.sleep(50);
  6. System.out.printf("Thread %s isAlive: %b\n", thread1.getName(), thread1.isAlive());
  7. runnable.stop();
  8. System.out.printf("Thread %s isAlive: %b\n", thread1.getName(), thread1.isAlive());
  9.  
  10. // вывод
  11. Thread Thread-0 started
  12. Thread Thread-0. counter = 0
  13. Thread Thread-0. counter = 1
  14. Thread Thread-0. counter = 2
  15. Thread Thread-0. counter = 3
  16. Thread Thread-0. counter = 4
  17. Thread Thread-0 interrupted
  18. Thread Thread-0. counter = 5
  19. Thread Thread-0 interrupted
  20. Thread Thread-0. counter = 6
  21. Thread Thread-0 interrupted
  22. // ...
  23. Thread Thread-0. counter = 28
  24. Thread Thread-0 interrupted
  25. Thread Thread-0. counter = 29
  26. Thread Thread-0 isAlive: true
  27. Thread Thread-0 interrupted
  28. Thread Thread-0 finished
  29. Thread Thread-0 isAlive: false

Второй пример:
  1. // второй пример
  2. thread2.start();
  3. Thread.sleep(500);
  4. thread2.interrupt();
  5. Thread.sleep(50);
  6. System.out.printf("Thread %s isAlive: %b\n", thread2.getName(), thread2.isAlive());
  7.  
  8. // вывод
  9. Thread second started
  10. Thread second. counter = 0
  11. Thread second. counter = 1
  12. Thread second. counter = 2
  13. Thread second. counter = 3
  14. Thread second. counter = 4
  15. Thread second interrupted
  16. Thread second finished
  17. Thread second isAlive: false

Потокам можно давать имена:
  1. Thread thread2 = new Thread(new CustomRunnable2(), "second");
  2. // или
  3. thread2.setName("second");


Вы уже заметили, что во всех блоках catch я пишу код Thread.currentThread().interrupt();? Это хорошая практика. Если во время паузы произошло исключение InterruptedException, то вы помечаете поток как interrupted и тем самым завершаете его выполнение (если использовали Thread.interrupted()). Если же оставлять блок catch пустым, то ничего не пометится. В некоторых случаях это может быть оправдано, но в остальном, рекомендую писать так.

С учётом исключения InterruptedException, можно переписать первый пример:
  1. private boolean volatile isRunning;
  2.  
  3. public void stop() {
  4.     isRunning = false;
  5. }
  6.  
  7. @Override
  8. public void run() {
  9.     isRunning = true;
  10.     String name = Thread.currentThread().getName();
  11.     System.out.printf("Thread %s started\n", name);
  12.  
  13.     int counter = 0;
  14.     try {
  15.         while (isRunning) {
  16.             System.out.printf("Thread %s. counter = %d\n", name, counter);
  17.             counter++;
  18.  
  19.             Thread.sleep(100);
  20.         }
  21.     } catch (InterruptedException ex) {
  22.         System.out.printf("Thread %s interrupted\n", name);
  23.         Thread.currentThread().interrupt();
  24.     }
  25. }

При исключении мы покинем цикл while и поток завершится. Оба метода runnable.stop() и thread.interrupt() теперь успешно завершают поток. Обратите внимание на ключевое слово volatile для поля isRunning, я объясню его значение чуть позже.

  1. // третий пример
  2. thread3.start();
  3. Thread.sleep(500);
  4. // now both methods works fine
  5. thread3.interrupt();
  6. //runnable3.stop();
  7. Thread.sleep(50);
  8. System.out.printf("Thread %s isAlive: %b\n", thread3.getName(), thread3.isAlive());
  9.  
  10. // вывод
  11. Thread Thread-1 started
  12. Thread Thread-1. counter = 0
  13. Thread Thread-1. counter = 1
  14. Thread Thread-1. counter = 2
  15. Thread Thread-1. counter = 3
  16. Thread Thread-1. counter = 4
  17. Thread Thread-1 interrupted
  18. Thread Thread-1 finished
  19. Thread Thread-1 isAlive: false


Проблема доступа к общим ресурсам
Как я говорил ранее, при работе с потоками может возникнуть множество проблем, поэтому важно уметь проектировать приложение так, чтобы этих проблем избежать. К счастью, в Java есть огромное множество решений, с которыми я вас постепенно буду знакомить.


Первая проблема - доступ к общим ресурсам из нескольких потоков. Допустим, есть некоторый счётчик:
  1. public class Counter {
  2.  
  3.     private static int counter = 0;
  4.  
  5.     public static int get() {
  6.         return counter;
  7.     }
  8.  
  9.     public static void increment() {
  10.         counter++;
  11.     }
  12. }

И есть два потока, которые увеличивают его значение до тех пор, пока оно не станет равным 5000.
  1. private static void printCounter() {
  2.     while (Counter.get() < 5000) {
  3.         for (int i = 0; i < 10; i++) {
  4.             Counter.increment();
  5.         }
  6.         try {
  7.             Thread.sleep(1);
  8.         } catch (InterruptedException ex) {
  9.             Thread.currentThread().interrupt();
  10.         }
  11.     }
  12. }
  13.  
  14. // ...
  15. Runnable runnable = CounterProblem::printCounter;
  16. Thread thread1 = new Thread(runnable);
  17. Thread thread2 = new Thread(runnable);
  18.  
  19. thread1.start();
  20. thread2.start();
  21.  
  22. // ожидание завершения потоков
  23. thread1.join();
  24. thread2.join();
  25. System.out.println("All threads finished. Counter = " + Counter.get());

Очевидно, мы должны получить значение 5000, но не тут то было:
  1. All threads finished. Counter = 5003

Запускаем ещё раз:
  1. All threads finished. Counter = 5009

Почему так?
Тут вступает проблема видимости объекта.
Дело в том, что, изменив поле одним потоком, второй не сразу может увидеть это изменение или увидит его в другом порядке. Это связано с другим понятием атомарность.

Атомарной называется такая операция, которая может быть выполнена за один неделимый шаг.
Например, атомарной операцией будет присвоение числу int некоторой константы. А неатомарной - сумма двух чисел.
Чтобы пояснить, я окунусь глубже в JVM.
Некоторые операции в Java состоят из нескольких инструкций байт-кода:
  1. private int counter;
  2.  
  3. counter = 6;
  4. // bipush 6
  5. // putstatic Counter.counter
  6.  
  7. counter += 6;
  8. // getstatic Counter.counter
  9. // bipush 6
  10. // iadd
  11. // putstatic Counter.counter

Подвох вот в чём. В первом случае изменение поля происходит за одну инструкцию putstatic. Во втором, между getstatic и putstatic аж две инструкции. Ничто не мешает другому потоку, пока выполняется сложение, перезаписать значение поля counter.

Детальнее обо всём этом можно прочесть в статье: Модель памяти в примерах и не только / Хабрахабр


На 32-битных системах, присвоение значения типу long или double тоже будет неатомарной операцией. Но объявив поле с ключевым словом volatile, JVM будет гарантировать атомарность и видимость всем простым операциям, а именно чтению и записи поля. Вот только в нашем случае со счётчиком это не поможет - операция инкремента неатомарна.

Нам же нужен способ как-то блокировать другие потоки на время выполнения неатомарных операций. И этот способ есть - синхронизация.
В блоке синхронизации может находиться только один поток, остальные должны ожидать, когда этот поток покинет блок.

Для объявления блоков синхронизации в Java служит ключевое слово synchronized:
  1. synchronized (lockObject) {
  2.     // критическая секция
  3. }

В качестве lockObject может выступать final поле или класс:
  1. private final Object lock = new Object();
  2. synchronized (lock) { }
  3.  
  4. synchronized(this) { }
  5.  
  6. synchronized(Main.class) { }


Теперь, чтобы решить нашу проблему со счётчиком, достаточно добавить неатомарную операцию в блок синхронизации.
  1. private static void printCounter() {
  2.     while (Counter.get() < 5000) {
  3.         synchronized (Counter.class) {
  4.             for (int i = 0; i < 10; i++) {
  5.                 Counter.increment();
  6.             }
  7.         }
  8.         try {
  9.             Thread.sleep(1);
  10.         } catch (InterruptedException ex) {
  11.             Thread.currentThread().interrupt();
  12.         }
  13.     }
  14. }
  15.  
  16. // Выводит
  17. All threads finished. Counter = 5000

На самом деле в этом коде тоже есть проблема, попробуйте найти её после прочтения раздела. Или читайте комментарии к статье.

Как это будет работать:
  - Мы создали два потока и запустили их.
  - Допустим, первый поток запустился быстрее и вошёл в цикл while. Второй пока запускается.
  - Первый поток видит блок synchronized. Выполняется проверка - нет ли сейчас в этом блоке других потоков? Нет, поэтому первый поток заходит в блок. Второй пока что вошёл в цикл while.
  - Первый поток сейчас в цикле for увеличивает счётчик. Второй поток доходит до блока synchronized. Снова выполняется проверка и поскольку поток внутри есть, разрешение войти внутрь не получено, а значит второй поток ждёт.
  - Первый поток всё ещё в цикле for. Второй поток всё так же ждёт.
  - Наконец, первый поток выходит из цикла for и покидает область синхронизации. Второй поток получает разрешение войти внутрь.

Таким образом, получается синхронизированная работа потоков.


Блоки синхронизации следует расставлять с умом. Если бы мы сделали вот так:
  1. private static void printCounter() {
  2.     synchronized (Counter.class) {
  3.         while (Counter.get() < 5000) {
  4.             for (int i = 0; i < 10; i++) {
  5.                 Counter.increment();
  6.             }
  7.             try {
  8.                 Thread.sleep(1);
  9.             } catch (InterruptedException ex) {
  10.                 Thread.currentThread().interrupt();
  11.             }
  12.         }
  13.     }
  14. }

мы бы тоже получили верный результат 5000, вот только работал бы у нас только один поток:
  - Создаём два потока и запускаем их.
  - Допустим, первый поток запустился быстрее и вошёл в блок синхронизации. Второй пока запускается.
  - Первый поток теперь в цикле while. Второй поток встретил блок synchonized и не получил разрешение войти.
  - Первый поток работает. Второй ждёт.
  - Спустя некоторое количество времени, первый поток увеличил счётчик до 5000 и вышел из циклов и блока синхронизации. Второму потоку разрешается войти внутрь.
  - Первый поток завершил работу. Второй поток проверил, что условие Counter.get() < 5000 уже не выполняется и не вошёл в цикл while. Покинул блок синхронизации и завершился.


Другой вариант решения проблемы со счётчиком - сделать его методы get и increment синхронизированными. Тогда блок синхронизации в методе run не понадобится.
  1. public class SynchronizedCounter {
  2.  
  3.     private static int counter = 0;
  4.  
  5.     public static synchronized int get() {
  6.         return counter;
  7.     }
  8.  
  9.     public static synchronized void increment() {
  10.         counter++;
  11.     }
  12. }

Это почти эквивалентно коду:
  1. public class SynchronizedCounter {
  2.  
  3.     private static int counter = 0;
  4.  
  5.     public static int get() {
  6.         synchronized (SynchronizedCounter.class) {
  7.             return counter;
  8.         }
  9.     }
  10.  
  11.     public static void increment() {
  12.         synchronized (SynchronizedCounter.class) {
  13.             counter++;
  14.         }
  15.     }
  16. }

Отличия кроются в байткоде.
В первом случае метод помечается флагом SYNCHRONIZED
  1. public static synchronized int get();
  2. Flags: PUBLIC, STATIC, SYNCHRONIZED
  3. Code:
  4.     stack=1, locals=0, arguments=0
  5.           linenumber      15
  6.        0: getstatic       SynchronizedCounter.counter:I
  7.        3: ireturn        

Во втором, добавляются дополнительные инструкции:
  1. public static int get();
  2. Flags: PUBLIC, STATIC
  3. Code:
  4.     stack=2, locals=2, arguments=0
  5.           linenumber      19
  6.        0: ldc             LSynchronizedCounter;.class
  7.        2: dup
  8.        3: astore_0
  9.        4: monitorenter
  10.           linenumber      20
  11.        5: getstatic       SynchronizedCounter.counter:I
  12.        8: aload_0
  13.        9: monitorexit
  14.       10: ireturn        
  15.           linenumber      21
  16.       11: astore_1
  17.       12: aload_0
  18.       13: monitorexit
  19.       14: aload_1
  20.       15: athrow
  21. Exceptions:
  22.     Try           Handler
  23.     Start  End    Start  End    Type
  24.     -----  -----  -----  -----  ----
  25.     5      10     11     16     Any
  26.     11     14     11     16     Any

Что можно представить так:
  1. monitorenter (SynchronizedCounter.class)
  2. try {
  3.     return counter
  4. } finally {
  5.     monitorexit
  6. }

То есть мы заходим в блок синхронизации, монитор. Возвращаем значение и выходим из монитора. Если во время критической секции произошло исключение, мы также выходим из монитора.

В байткоде synchronized метода инструкций monitorenter и monitorexit не было, но это не значит, что нет входа в монитор. Флаг SYNCHRONIZED у метода говорит JVM о том, что все эти инструкции нужно выполнить. То есть, они не появляются в коде, но сокрыты в JVM - она всё равно их выполнит.


Забегая вперёд, продемонстрирую ещё одно возможное решение проблемы. В пакете java.util.concurrent есть множество классов для различных многопоточных нужд. Одним из таких классов является AtomicInteger, который делает операции над числами атомарными.
  1. public class AtomicCounter {
  2.  
  3.     private static final AtomicInteger counter = new AtomicInteger();
  4.  
  5.     public static int get() {
  6.         return counter.get();
  7.     }
  8.  
  9.     public static void increment() {
  10.         counter.incrementAndGet();
  11.     }
  12. }

Теперь нигде не нужно добавлять блок синхронизации.
О других классах, упрощающих работу с многопоточностью, я постараюсь рассказать в следующей статье.


Синхронизация. Пример проектирования многопоточного приложения
В довершение статьи, хочу показать пример небольшого приложения и важность правильного проектирования потоков.
Допустим, у нас есть 20 файлов с данными. Нам нужно наиболее эффективно прочитать эти файлы и вывести данные на экран. Данные, по мере чтения, будут добавляться в список и уже оттуда выводиться на экран.

Есть класс, отвечающий за панель рисования, туда мы будем добавлять объекты по мере чтения:
  1. private PaintPanel panel;
  2.  
  3. private void processFile(File file) {
  4.     System.out.printf("Process file %s in %s thread\n", file.getName(), Thread.currentThread().getName());
  5.     try(FileInputStream fis = new FileInputStream(file);
  6.         DataInputStream dis = new DataInputStream(fis)) {
  7.         while (dis.available() > 0) {
  8.             Triangle triangle = Triangle.read(dis);
  9.             panel.addTriangle(triangle);
  10.             Thread.sleep(1);
  11.         }
  12.     } catch (IOException | InterruptedException ie) {
  13.         Thread.currentThread().interrupt();
  14.     }
  15. }


Вариант 1. Без многопоточности.
  1. for (File file : files) {
  2.     processFile(file);
  3. }
  4.  
  5.  
  6. public class PaintPanelWithoutSynchronized extends PaintPanel {
  7.  
  8.     private final List<Triangle> triangles;
  9.  
  10.     public PaintPanelWithoutSynchronized(int width, int height) {
  11.         super(width, height);
  12.         triangles = new ArrayList<>();
  13.     }
  14.  
  15.     @Override
  16.     public void addTriangle(Triangle triangle) {
  17.         triangles.add(triangle);
  18.         SwingUtilities.invokeLater(this::repaint);
  19.     }
  20.  
  21.     @Override
  22.     protected void paintComponent(Graphics g) {
  23.         super.paintComponent(g);
  24.         for (Triangle triangle : triangles) {
  25.             triangle.draw(g);
  26.         }
  27.     }
  28. }

Казалось бы, нет потоков - нет проблем - будет работать медленно, но зато стабильно. Не тут-то было. При работе иногда выскакивает ошибка:

  1. Exception in thread "AWT-EventQueue-0" java.util.ConcurrentModificationException
  2.     at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
  3.     at java.util.ArrayList$Itr.next(ArrayList.java:851)
  4.     at concurrency.synchronization.PaintPanelWithoutSynchronized.paintComponent(PaintPanelWithoutSynchronized.java:27)

Ошибка ConcurrentModificationException появляется в том случае, когда предпринимается попытка доступа к списку из разных потоков.
Но погодите-ка, мы ведь не создавали потоков? Мы - нет, а вот для отрисовки интерфейса служит другой поток.

  1. addTriangle: main thread
  2. paintComponent: AWT-EventQueue-0 thread

Так что, как ни крути, а доступ к списку нужно обернуть в блок синхронизации:
  1. public class PaintPanelWithSynchronized extends PaintPanel {
  2.  
  3.     private final List<Triangle> triangles;
  4.  
  5.     public PaintPanelWithSynchronized(int width, int height) {
  6.         super(width, height);
  7.         triangles = new ArrayList<>();
  8.     }
  9.  
  10.     @Override
  11.     public void addTriangle(Triangle triangle) {
  12.         synchronized (triangles) {
  13.             triangles.add(triangle);
  14.         }
  15.         SwingUtilities.invokeLater(this::repaint);
  16.     }
  17.  
  18.     @Override
  19.     protected void paintComponent(Graphics g) {
  20.         super.paintComponent(g);
  21.         synchronized (triangles) {
  22.             for (Triangle triangle : triangles) {
  23.                 triangle.draw(g);
  24.             }
  25.         }
  26.     }
  27. }

Теперь ошибок нет, но обработка 20 файлов занимает порядка пяти минут. Причём, первые файлы читаются быстро, а потом работа замедляется. Попробуйте понять, почему так происходит.


Вариант 2. Один файл - один поток.

Имея 20 потоков логично предположить, что работа пройдёт в 20 раз быстрее. Так бы оно и было, имей наш процессор хотя бы 20 ядер. В противном случае мы лишь создадим дополнительную нагрузку и все наши данные могут отрисовываться не очень плавно.
  1. List<Thread> threads = new ArrayList<>(files.length);
  2. for (File file : files) {
  3.     Thread thread = new Thread(() -> processFile(file));
  4.     threads.add(thread);
  5.     thread.start();
  6. }
  7. try {
  8.     for (Thread thread : threads) {
  9.         thread.join();
  10.     }
  11. } catch (InterruptedException e) {
  12.     Thread.currentThread().interrupt();
  13. }

Тем не менее, время работы теперь: 40 секунд. Это долго. Та же проблема, что замедляла работу в первом варианте, замедляет всё и сейчас. Должен быть способ избавиться от synchronized блоков и такой способ есть.


Вариант 3. Использование синхронизированного списка.

Для того, чтобы сделать из обычного списка синхронизированный, вызываем метод
  1. Collections.synchronizedList(list)

Теперь мы получим список, у которого все методы будут synchronized. Но, к сожалению, итератор, который используется в foreach таким образом не получится синхронизировать и нам придётся либо оборачивать его в synchronized блок, либо отказаться от него в пользу простого перебора элементов в цикле for.
  1. public class PaintPanelSynchronizedList extends PaintPanel {
  2.  
  3.     private final List<Triangle> triangles;
  4.  
  5.     public PaintPanelSynchronizedList(int width, int height) {
  6.         super(width, height);
  7.         triangles = Collections.synchronizedList(new ArrayList<>());
  8.     }
  9.  
  10.     @Override
  11.     public void addTriangle(Triangle triangle) {
  12.         triangles.add(triangle);
  13.         SwingUtilities.invokeLater(this::repaint);
  14.     }
  15.  
  16.     @Override
  17.     protected void paintComponent(Graphics g) {
  18.         super.paintComponent(g);
  19.         for (int i = 0; i < triangles.size(); i++) {
  20.             triangles.get(i).draw(g);
  21.         }
  22.     }
  23. }

Время работы теперь чуть менее 4 секунд!


Вариант 4. Ограничение количества потоков.

Мы можем ограничить количество потоков, создав пул:
  1. ExecutorService es = Executors.newFixedThreadPool(maxThreads);

Теперь можно создать хоть сотню потоков, выполняться в один момент времени будут не более maxThreads потоков.

Для добавления в пул существует метод execute(Runnable r). Пул потоков нужно завершать методом shutdown или shutdownNow. Метод awaitTermination(long timeout, TimeUnit unit) ждёт завершения потоков либо указанное в параметрах время, если к этому моменту есть работающие задачи.
Подробнее об Executor и ExecutorService я постараюсь рассказать в следующей статье.

Ограничим пул пятью потоками
  1. ExecutorService es = Executors.newFixedThreadPool(5);
  2. for (File file : files) {
  3.     es.execute( () -> processFile(file) );
  4. }
  5. es.shutdown();
  6. try {
  7.     es.awaitTermination(2, TimeUnit.MINUTES);
  8. } catch (InterruptedException e) {
  9.     Thread.currentThread().interrupt();
  10. }

Теперь время исполнения увеличилось до 11 секунд, но зато отрисовка проходит плавнее и мы можем быть уверены, что будь у нас сотня файлов, система не получит большую нагрузку.

Лучшей практикой, является ограничение пула потоков по количеству процессоров в системе:
  1. Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() );

Вариант 5. java.util.concurrent

И снова я забегу вперёд и возьму что-нибудь из пакета java.util.concurrent. В этом пакете есть класс CopyOnWriteArrayList. Он настолько суров, что блоки синхронизации ему не нужны и можно беспрепятственно перебирать элементы в foreach.
  1. import java.util.concurrent.CopyOnWriteArrayList;
  2.  
  3. public class PaintPanelCopyOnWriteArrayList extends PaintPanel {
  4.  
  5.     private final List<Triangle> triangles;
  6.  
  7.     public PaintPanelCopyOnWriteArrayList(int width, int height) {
  8.         super(width, height);
  9.         triangles = new CopyOnWriteArrayList<>();
  10.     }
  11.  
  12.     @Override
  13.     public void addTriangle(Triangle triangle) {
  14.         triangles.add(triangle);
  15.         SwingUtilities.invokeLater(this::repaint);
  16.     }
  17.  
  18.     @Override
  19.     protected void paintComponent(Graphics g) {
  20.         super.paintComponent(g);
  21.         for (Triangle triangle : triangles) {
  22.             triangle.draw(g);
  23.         }
  24.     }
  25. }


Особых изменений в скорости работы при нескольких потоках мы не получим, но если запустить всё в одном потоке, как в первом случае, то вместо пяти минут мы получим 55 секунд!
  1. Process file 0.dat in main thread
  2. Process file 1.dat in main thread
  3. Process file 10.dat in main thread
  4. Process file 11.dat in main thread
  5. Process file 12.dat in main thread
  6. Process file 13.dat in main thread
  7. Process file 14.dat in main thread
  8. Process file 15.dat in main thread
  9. Process file 16.dat in main thread
  10. Process file 17.dat in main thread
  11. Process file 18.dat in main thread
  12. Process file 19.dat in main thread
  13. Process file 2.dat in main thread
  14. Process file 3.dat in main thread
  15. Process file 4.dat in main thread
  16. Process file 5.dat in main thread
  17. Process file 6.dat in main thread
  18. Process file 7.dat in main thread
  19. Process file 8.dat in main thread
  20. Process file 9.dat in main thread
  21.  
  22. Elapsed time: 54942 ms

Как можно понять из названия, его магия в том, что на каждую операцию добавления в список, создаётся новый массив. Это не очень хорошо в плане памяти, зато потокобезопасно. Подробнее разберёмся в следующей статье.

Наглядная работа всех вариантов представлена в этой анимации:
threads.gif
Версия побольше


Выводы
  * Для работы с потоками в Java есть класс Thread.
  * Поток можно создать расширив класс Thread или передав класс, реализовывающий интерфейс Runnable в конструктор класса Thread.
  * Остановить поток в Java стандартными средствами нельзя.
  * Для остановки потоков можно воспользоваться встроенным механизмом interrupt()/interrupted()
  * При выходе из программы, JVM ожидает завершения всех потоков, кроме потоков-демонов.
  * Потокам можно задавать имена в конструкторе или с помощью метода setName.
  * Thread.sleep(100) приостановит текущий поток на 100 миллисекунд.
  * thread.join() ждёт завершения потока.
  * Необходимо синхронизировать работу потоков над общими ресурсами при помощи блоков синхронизации.
  * synchronized метод и synchronized блок порождают разный байт-код на с точки зрения исполнения кода JVM это одно и то же.
  * Ключевое слово volatile обеспечивает видимость и атомарность при чтении и записи полей для всех потоков.
  * В пакете java.util.concurrent есть множество полезных классов для работы с многопоточностью.
  * synchronized блоки нужно расставлять с умом, иначе получим потерю производительности.
  * ArrayList потокоНЕбезопасен. Для создания потокобезопасной копии, есть метод Collections.synchronizedList.
  * Для ограничения количества потоков существуют пулы потоков.
  * Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ) создаст пул потоков с числом ядер процессора, что является эффективным решением.
  * Даже если вы не создаёте потоков, это не значит, что их нет в программе.


Исходный код: GitHub
java, многопоточность
+9   9   0
3228
Похожие статьи
AsyncTask в Java ME
Трансляция проигрываемой в AIMP музыки в статус ВКонтакте
Бэкап сообщений Вконтакте с использованием VK API и Java 8

  © aNNiMON (Melnik Software)
Онлайн: 21 (2/19)
 
Яндекс.Метрика