Разное

Java потоки синхронизация: Синхронизация потоков, блокировка объекта и блокировка класса

Содержание

Синхронизация потоков. Курс «Программирование на Java»

В многопоточной программе не всегда один поток работает независимо от другого. Бывает, они обмениваются данными или обрабатывают одни и те же объекты.

В таких случаях возникают проблемы правильной организации взаимодействия нитей так, чтобы они не мешали работе друг друга. Один поток должен знать об изменениях, внесенных другим потоком.

Синхронизация потоков – это настройка их взаимодействия. Рассмотрим пример, в котором два разных потока работают с одним и тем же объектом:

public class NoSynch {
    public static void main(String[] args) 
           throws InterruptedException {
        Client client = new Client(1000);
        Thread operation = 
                  new Operation(client, 1000);
        Thread operation1 = 
                  new Operation(client, 500);
        operation.start();
        operation1.start();
        operation.join();
        operation1.join();
        System. out.println(client.getBill());
    }
}
 
class Operation extends Thread {
    private Client mClient;
    private int mPay;
 
    Operation(Client client, int pay) {
        mClient = client;
        mPay = pay;
    }
 
    @Override
    public void run() {
        System.out.println(this.getName() + 
                   ": " + mClient.getBill());
        if (mClient.getBill() - mPay >= 0) {
            try {
                sleep(1000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            mClient.changeBill(mPay);
        }
        System.out.println(this.getName() + 
                    ": " + mClient.getBill());
        System.out.println(this.getName() + " stop");
    }
}
 
class Client {
    private int mBill;
 
    Client(int bill) {
        this.mBill = bill;
    }
 
    int getBill() {
        return mBill;
    }
 
    void changeBill(int pay) {
        mBill -= pay;
    }
}

Результат:

Thread-0: 1000
Thread-1: 1000
Thread-0: 0
Thread-0 stop
Thread-1: -500
Thread-1 stop
-500

Задержка sleep’ом искусственно надумана, чтобы разделить проверку условия и вычитание. В реальных программах между одним действием и другим могло бы быть множество промежуточных, на выполнение которых требуется время.

У клиента на счету оказалась отрицательная сумма, хотя по логике вещей одна из операций вычитания не должна была бы выполняться. Проблема возникла из-за того, что в каждом потоке на момент проверки (mClient.getBill() - mPay >= 0) было по 1000 на счету. И каждый поток «решил», что денег достаточно. После этого поток-0 уменьшил сумму. Когда поток-1 приступил к вычитанию, денег на счету уже не было.

Чтобы исключить подобную логическую ошибку, надо как-то блокировать объект client, чтобы пока он обрабатывается в одном потоке, другие не могли его изменять. Как вариант делать это в методе в run():

class Operation extends Thread {
    private final Client mClient;
    private int mPay;
 
    Operation(Client client, int pay) {
        mClient = client;
        mPay = pay;
    }
 
    @Override
    public void run() {
        System. out.println(this.getName() + 
                   ": " + mClient.getBill());
        synchronized (mClient) {
            if (mClient.getBill() - mPay >= 0) {
                try {
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                mClient.changeBill(mPay);
            }
        }
        System.out.println(this.getName() + 
                   ": " + mClient.getBill());
        System.out.println(this.getName() + " stop");
    }
}

Результат:

Thread-0: 1000
Thread-1: 1000
Thread-0: 0
Thread-0 stop
Thread-1: 0
Thread-1 stop
0

И хотя оба потока сначала увидели 1000. Когда началась проверка и вычитание, действия выполнялись совместно, другой поток в это время доступ к объекту не имел.

Блок кода с ключевым словом synchronized может одновременно выполняться только одним потоком.

В примере синхронизация выполнена по объекту. В Java также возможна синхронизация по методу, т. е. пока методом пользуется один поток, другому он недоступен. Если в вышеприведенной программе мы по отдельности синхронизируем методы getBill() и changeBill() объекта, это ничего не даст. Решением может быть объединение проверки и вычитания в один метод:

class Operation1 extends Thread {
    private Client1 mClient;
    private int mPay;
    Operation1(Client1 client, int pay) {
        mClient = client;
        mPay = pay;
    }
 
    @Override
    public void run() {
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        mClient.changeBill(mPay);
    }
}
 
class Client1 {
    private int mBill;
    Client1(int bill) {
        this.mBill = bill;
    }
 
    synchronized void changeBill(int pay) {
        System.out.println(mBill);
        if (mBill-pay >= 0) {
            mBill -= pay;
            System.out.println(mBill);
        }
    }
}

У каждой нити есть собственный кэш, куда копируются данные, с которыми она работает. Таким образом, одна нить не имеет доступа к данным другой и не может прочитать произошедшие изменения, а читает устаревшие значения из обычной памяти. Если потоки работают с общими данными, можно запретить их помещение в кэш, используя ключевое слово volatile. Например:

private volatile int mBill;

Синхронизация Потоков Java (Разные Стеки)

нестатические переменные существуют в разных местах в памяти для каждого потока

Это неверно, поэтому ответ на

если код, который выполняет поток, содержит некоторую переменную класса v1, тогда каждый поток имеет свой собственный «экземпляр» v1 (другой адрес памяти), и ни один другой поток не может «коснуться» его… не так ли

— нет. Потоки могут касаться экземпляров объектов, выделенных и модифицированных другими потоками, и это зависит от программиста, чтобы это не повлияло на правильность программы.

Переменные члена класса существуют в одном месте в памяти для каждого класса, а не в потоке. Это правда, что между барьерами памяти (подумайте о начале { и конце } of synchronized), что поток может иметь кеш состояния объекта, но это не то же самое, что и язык, требующий хранения на потоке. «Память для каждого потока» — это его стек, который не содержит элементов объекта * — только ссылки на объекты.

Лучший способ подумать об этом заключается в том, что в каждом куче есть одно место в куче, но может быть много раз чтение и запись с использованием этой памяти в то же время.

Я вижу, как вы пришли к выводам, которые вы сделали, если слышали, что потоки распределяют объекты в разных частях кучи. У некоторых JVM есть оптимизация, при которой они распределение потока-нити, но это не мешает другим потокам получать доступ к этим объектам.

Локальное распределение потоков

Если распределитель действительно реализован, как показано в листинге 1, общее поле heapStart быстро станет значительным узким местом concurrency, так как каждое распределение будет включать в себя блокировку, защищающую это поле. Чтобы избежать этой проблемы, большинство JVM используют поточно-локальные блоки распределения, где каждый поток распределяет большую часть памяти из кучи и запросов на мелкие запросы обслуживания последовательно из этого потока-локального блока. В результате количество раз, когда поток должен получить общую блокировку кучи, значительно уменьшается, улучшая concurrency.

* — возможно, что оптимизация JVM позволяет некоторым объектам быть выделенными в стеке.

Что такое синхронизация Java?

В информатике поток представляет собой последовательность инструкций внутри программы, которые могут выполняться независимо от другого кода. Многие потоки могут запускаться одновременно внутри программы. Все Java-программы имеют по крайней мере один поток, известный как основной поток, который создается JVM при запуске программы, когда метод main() вызывается с основным потоком. Это объект, который имеет свои собственные регистры, стек и сегмент кода, которые могут работать параллельно с другими потоками процесса (процесс представляет собой набор потоков).

Что такое многопоточность?

Многопоточность — это процесс одновременного выполнения нескольких потоков. Это означает, что он использует два или более «потоков» выполнения, работающих вместе для выполнения задачи. Каждая последовательность команд имеет свой собственный уникальный поток управления, который не зависит от всех остальных.

Синхронизация в Java

В общем случае синхронизация используется для защиты доступа к ресурсам, к которым обращаются одновременно. Одним из преимуществ использования нескольких потоков в приложении является то, что каждый поток выполняется асинхронно. Существует много ситуаций, когда несколько потоков должны совместно использовать доступ к общим объектам. Например, в системе базы данных вам может не потребоваться, чтобы один поток обновлял запись базы данных, а другой поток пытался ее прочитать. В этих случаях мы должны гарантировать, что ресурс будет использоваться только по одному потоку за раз. В противном случае два или более потока могут получить доступ к одному и тому же ресурсу в одно и то же время, каждый не знает о действиях другого. Java позволяет вам координировать действия нескольких потоков с использованием синхронизированных методов и синхронизированных операторов. Доступ к объекту, для которого нужно координировать доступ, осуществляется посредством использования синхронизированных методов. Эти методы объявляются с ключевым словом synchronized. Только один синхронизированный метод может быть вызван для объекта в данный момент времени. Это позволяет синхронизировать методы в нескольких потоках от конфликта друг с другом. Ниже приведен общий вид синхронизированного оператора:

Синтаксис

synchronized(objectidentifier) {
// Доступ к общим переменным и другим общим ресурсам
}

synchronized(objectidentifier) {

   // Доступ к общим переменным и другим общим ресурсам

}

Параметр objectidentifier является ссылкой на объект, блокировка которого связана с монитором, который представляет собой синхронизированный оператор. Язык программирования Java предоставляет две основные идиомы синхронизации: синхронизированные методы и синхронизированные операторы.

Что такое синхронизированные методы и синхронизированные операторы?

Синхронизированные методы позволяют простую стратегию предотвращения помех потоков и ошибок согласованности памяти: если объект виден более чем одному потоку, все считывания или записи в переменные этого объекта выполняются с помощью синхронизированных методов. Невозможно чередование двух вызовов синхронизированных методов на одном объекте. Когда один поток выполняет синхронизированный метод для объекта, все другие потоки, которые вызывают синхронизированные методы для одного и того же блока объектов (приостанавливают выполнение) до тех пор, пока первый поток не будет выполнен с объектом. Чтобы синхронизировать метод, просто добавьте синхронизированное ключевое слово в его объявление:

public synchronized void increament(){
count++;
}

public synchronized void increament(){

    count++;

}

Синхронизированный блок

Синхронизирующий блок обеспечивает атомарность связки операторов кода. Если вам нужно синхронизировать доступ к объекту класса или вы хотите, чтобы часть метода была синхронизирована с объектом, вы можете использовать для него синхронизированный блок.

public void add(int value){
synchronized(this){
this.count += value;
}
}

public void add(int value){

  synchronized(this){

     this.count += value;

  }

}

Одно существенное различие между синхронизированным методом и блоком состоит в том, что Синхронизированный блок обычно уменьшает объем блокировки. Поскольку объем блокировки обратно пропорционален производительности, всегда лучше блокировать только критический раздел кода. Кроме того, синхронизированный блок может вызывать Java.lang.NullPointerException, если выражение, предоставленное блоку, как параметр, имеет значение null, что не соответствует синхронным методам.

Источник: http://net-informations. com/Java/cJava/synchronization.htm

Потоки и синхронизация — ni0xx

Безопасность потоков

Если несколько потоков обращаются к одной изменяемой переменной без соответсвующей синхронизации, программа сломана. Есть 3 способа это исправить:

  • не шарить переменную между потоками
  • сделать переменную неизменяемой
  • использовать синхронизацию всегда, когда потоки пытаются получить к переменной доступ

Класс является потоко-безопасным если он правильно себя ведет, когда к нему обращаются из нескольких потоков, независимо от расписания или чередование выполнения этих потоков в среде выполнения, и без дополнительной синхронизации или другой координации со стороны вызывающего кода.

Атомарность (Atomicy)

Атомарные операции — операции, выполняющиеся как единое целое либо не выполняющиеся вовсе.
Атомарная операция — это та операция, которая является атомарной в отношении всех операций, включая себя, которые работают на том же состоянии.

Классы, реализующий атомарные потоко-безопасные операции находятся в пакете
java.util.concurrent.atomic

Блокировка (Locking)

synchronized методы ( == synchronized(this) {} или вместо this -> class для static )
syncronized блоки

Совместное использование объектов

Видимость
public class NoVisibility {
    private static boolean ready;
    private static int number;
    private static class ReaderThread extends Thread {
        public void run() {
            while (!ready)
                Thread.yield();
            System.out.println(number);
        }
}
    public static void main(String[] args) {
        new ReaderThread().start();
        number = 42;
        ready = true;
} }

NoVisibility может иметь бесконечный цикл, поскольку видимость ready может никогда не стать видимымой для ReaderThread.
Даже NoVisibility может вывести 0, так как запись в ready может стать видимой для readerthread до того, как записи в number. Это явление известно как reodering.
Не существует никакой гарантии, что операции в одном потоке будут осуществляться в порядке, указанном в программе, пока их порядок не обнаруживается внутри этого потока, даже если изменение порядка очевидно для других потоков

Устаревший данные (Stale data)

При доступе к переменной из другого потока , в переменной могут оказаться устаревшие данные. Это результат механизма кэширования jvm. Для отключения этого механизма используется ключевое слово volatile .

Публикация объектов

Потеря this в конструкторе
При передаче создаваемого объекта в конструкторе существует опасность ,
что другой объект начнет использовать не до конца построенный объект с не проинициализированными полями

Техника ограничения потоков
  • Ограничения в объявлении (Ad-hoc Thread Confinement) на уровне интерфейса и документации — Переложить заботу о синхронизации на разработчика/имплементатора (например Swing)
  • Использование стэка (Stack Confinement) Создание и использование только локальных экземпляров внутри метода(ов)
  • Использование ThreadLocal
private static ThreadLocal<Connection> connectionHolder
    = new ThreadLocal<Connection>() {
        public Connection initialValue() {
            return DriverManager. getConnection(DB_URL);
} };
public static Connection getConnection() {
    return connectionHolder.get();
}

Для каждого потока будет создаваться один свой Connection

<back

Неизменяемость (Immutability)

Неизменяемые объекты всегда потоко-безопасны
Объект не изменяем если:

  • Его состояние не может быть изменено после создания
  • Все его поля объявлены как final
  • Объект правильно создан (нет потери this в конструкторе)

Чтобы опубликовать объект безопасно, ссылка на объект и состояние объекта должны стать видимыми другим потокам в одновременно. Правильно созданный объект может быть безопасно опубликован с помощью:

  • Инициализация ссылки на объект с помощью статического инициализатора
  • Сохранение ссылки на объект с помощью volatile поля или AtomicReference
  • Сохранение ссылки на объект в final поле правильно построенного объекта; или
  • Сохранение ссылки на объект в поле, защищенное блокировкой (синхронизацией)
Составление объектов (Composing objects)

Процесс проектирования потоко-безопасногоо класса включает 3 основных элемента:

  • Определить переменные, которые формируют состояние объекта
  • Определить инварианты, которые ограничивают переменные состояния
  • Задожить политику управления конкурентным доступом к состоянию объекта

Избегать синхронизации (это) в Java?

Мои два цента в 2019 году, хотя этот вопрос уже мог быть решен.

Блокировка «this» — это неплохо, если вы знаете, что делаете, но за сценой стоит блокировка «this» (что, к сожалению, позволяет синхронизированное ключевое слово в определении метода).

Если вы действительно хотите, чтобы пользователи вашего класса могли «украсть» вашу блокировку (то есть не допустить, чтобы другие потоки имели с ней дело), ​​вы действительно хотите, чтобы все синхронизированные методы ожидали, пока запущен другой метод синхронизации, и так далее. Он должен быть преднамеренным и хорошо продуманным (и, следовательно, задокументированным, чтобы помочь вашим пользователям понять это).

Более подробно, наоборот, вы должны знать, что вы «получаете» (или «теряете»), если вы блокируете недоступную блокировку (никто не может «украсть» вашу блокировку, вы полностью контролируете и так далее). ..).

Для меня проблема в том, что ключевое слово synchronized в сигнатуре определения метода позволяет программистам слишком легко не думать о том, что блокировать, о чем очень важно думать, если вы не хотите сталкиваться с проблемами в мульти программа

Нельзя утверждать, что «как правило» вы не хотите, чтобы пользователи вашего класса могли делать такие вещи, или что «обычно» вы хотите . .. Это зависит от того, какую функциональность вы кодируете. Вы не можете сделать правило большого пальца, поскольку вы не можете предсказать все варианты использования.

Рассмотрим, например, устройство печати, которое использует внутреннюю блокировку, но затем люди пытаются использовать ее из нескольких потоков, если они не хотят, чтобы их выходные данные чередовались.

Если ваша блокировка доступна вне класса или нет, это ваше решение как программиста, исходя из того, какие функциональные возможности есть в классе. Это часть API. Например, вы не можете перейти от синхронизированного (этого) к синхронизированному (provateObjet), не рискуя нарушить изменения в коде, использующем его.

Примечание 1: я знаю, что вы можете добиться того, что синхронизируется (это) «достигает», используя явный объект блокировки и выставляя его, но я думаю, что это не нужно, если ваше поведение хорошо документировано и вы действительно знаете, что означает блокировка «this».

Примечание 2: я не согласен с аргументом, что если какой-то код случайно украл вашу блокировку, это ошибка, и вы должны ее решить. В некотором смысле это тот же аргумент, что и я могу сделать все мои методы публичными, даже если они не предназначены для публичности. Если кто-то «случайно» называет мой метод закрытым, это ошибка. Зачем включать эту аварию в первую очередь !!! Если способность украсть ваш замок — проблема для вашего класса, не позволяйте это. Так просто.

Параллелизм в Java | Блог только про Java

Следует избегать особого типа ошибок, имеющего отношение к многозадачностии называемого взаимной блокировкой, которая происходит в том случае, когда потоки исполнения имеют циклическую зависимость от пары синхронизированных объек­тов.

Допустим, один поток исполнения входит в монитор объекта Х, а другой — в мо­нитор объекта У. Если поток исполнения в объекте Х попытается вызвать любой син­хронизированный метод для объекта У, он будет блокирован, как и предполагалось. Читать →

Опубликовано в Параллелизм в Java
|

Метки Deadlock, взаимная блокировка в Java, взаимная блокировка в Java пример программы, урок взаимная блокировка в Java
|

|

В параллельном API поддерживается средство, называемое исполнителем и предназначенное для создания потоков исполнения и управления ими.

В этом отношении исполнитель служит альтернативой управлению потоками исполне­ния средствами класса Thread. Читать →

Опубликовано в Параллелизм в Java
|

Метки Java Executor уроки, Java исполнитель Executor пример кода, исполнитель java примеры
|

|

В версии JDК 7 внедрен новый класс синхронизации под названием Phaser. Главное его назначение — синхронизировать потоки исполнения, которые пред­ставляют одну или несколько стадий (или фаз) выполнения действия. Например, в прикладной программе может быть несколько потоков исполнения, реализующих три стадии обработки заказов.

На первой стадии отдельные потоки исполне­ния используются для того, чтобы проверить сведения о клиенте, наличие товара на складе и его цену. По завершении этой стадии остаются два потока исполнения, где на второй стадии вычисляется стоимость доставки и сумма соответствующего налога, а на заключительной стадии подтверждается оплата и определяется ори­ентировочное время доставки.

В прошлом для синхронизации нескольких пото­ков исполнения в такой прикладной программе пришлось бы немало потрудиться. А с появлением класса Phaser этот процесс значительно упростился. Читать →

Опубликовано в Параллелизм в Java
|

Метки как использовать Класс Phaser в Java, Класс Phaser Java, Класс Phaser примеры Java
|

|

Планировщик потоков использует приоритеты потоков исполнения, чтобы принять решение, когда разрешить исполнение каждому потоку. Теоретически вы­сокоприоритетные потоки исполнения получают больше времени ЦП, чем низко­приоритетные.

А на практике количество времени ЦП, которое получает потоки сполнения, нередко зависит не только от его приоритета, но и от ряда других факторов. ( Например, особенности реализации многозадачности в операционной системе могут оказывать влияние на относительную доступность времени ЦП. ). Читать →

Опубликовано в Параллелизм в Java
|

Метки как управлять приоритетом потоков в Java, приоритет потоков Java пример, приоритеты потоков Java
|

|

Вероятно, наиболее интересным с точки зрения синхронизации является класс Exchanger, предназначенный для упрощения процесса обмена данными между двумя потоками исполнения.

Принцип действия класса Exchanger очень прост: он ожидает до тех пор, пока два отдельных потока исполнения не вызовут его метод exchange(). Как только это произойдет, он произведет обмен данны­ми, предоставляемыми обоими потоками. Такой механизм обмена данными не только изящен, но и прост в применении. Читать →

Опубликовано в Параллелизм в Java
|

Метки Exchanger, параллельное программирование java, синхронизация потоков Java
|

|

В программировании нередко возникают такие ситуации, когда два или больше потока должны находиться в режиме ожидания в предопределенной точке исполне­ния до тех пор, пока все эти потоки не достигнут данной точки.

Для этой цели в параллельном API предоставляется класс CyclicBarrier. Он позволяет определить объект синхронизации, который приостанавливается до тех пор, пока определенное количество потоков исполнения не достигнет некоторой барьерной точки. Читать →

Опубликовано в Параллелизм в Java
|

Метки CyclicBarrier, параллельное программирование java, синхронизация потоков Java
|

|

Иногда требуется, чтобы поток исполнения находился в режиме ожидания до тех пор, пока не наступит одно (или больше) событие.

Для этих целей в парал­лельном API предоставляется класс CountDownLatch, реализующий самоблокировку с обратным отсчетом. Объект этого класса изначально создается с количеством событий, которые должны произойти до того момента, как будет снята самоблокировка. Всякий раз, когда происходит событие, значение счетчика уменьшается.

Как только значение счетчика достигнет нуля, самоблокировка будет снята. Читать →

Опубликовано в Параллелизм в Java
|

Метки CountDownLatch, java поток, параллельное программирование java, пример кода Java
|

|

Первым сразу же распознаваемым среди объектов синхронизации является семафор, реализуемый в классе Semaphore.

Семафор управляет доступом к обще­му ресурсу с помощью счетчика. Если счетчик больше нуля, доступ разрешается,а если он равен нулю, то в доступе будет отказано.

В действительности этот счетчик подсчитывает разрешения, открывающие доступ к общему ресурсу. Следовательно,чтобы получить доступ к ресурсу, поток исполнения должен получить у семафора разрешение на доступ. Читать →

Опубликовано в Параллелизм в Java
|

Метки Semaphore Java примеры кода, Semaphore в языке Java, реализация семафора в Java
|

|

Реализация потоков в Java

Потоки в Java 21 декабря, 2014

Через потоковую модель Java представлены наиболее общие свойства, связанные с многопоточным программированием. Вся прелесть работы с потоками реализуется через класс Thread. У класса есть несколько статических методов для работы с потоками. Эти методы перечислены в таблице 1.1.

МетодОписание
 getId() Методом в качестве результата возвращается идентификатор потока — уникальное целое число, генерируемое при создании потока.
 getName() Метод в качестве результата возвращает имя потока, из которого вызывается этот метод.
 getPriority() Методом в качестве результата возвращает приоритет потока.
 getThreadGroup() Методом в качестве значения возвращается группа, к которой принадлежит текущий поток.
 holdsLock() Метод в качестве значения возвращает логическое значение. Значение равно true, если метод удерживает монитор объекта, переданного в качестве аргумента методу. В противном случае возвращается значение false. Монитор — объект, который используется для блокировки ресурса потоком и недопущения одновременного обращения к ресурсу разных потоков. Концепция монитора используется для синхронизации работы потоков.
 interrupt() Методом выполняется прерывание потока
 isAlive() Метод используется для того, чтобы определить, используется ли поток.
 join() Метод-инструкция ожидания завершения потока.
 run() Метод определяет точку входа в поток. Программный код потока задается путем переопределения этого метода.
 setName() Методом задается имя потока(имя потока указывается в виде текстовой строки аргументом метода).
 setPriority() Методом задается приоритет потока(приоритет передается аргументом методу).
 sleep() Метод используется для приостановки выполнения потока. Время(в миллисекундах), на которое выполняется приостановка в работе потока, указывается аргументом метода.
 start() Метод для запуска потока. При вызове этого метода автоматически запускается на выполнение метод run(), объявленный в интерфейсе Runnable.
 wait() Метод переводит поток в режим ожидания.
 yield() Метод временно приостанавливает текущий поток и позволяет выполнение других потоков.

Здесь следует дать некоторые пояснения относительно выполнения и не выполнения потоков. Поскольку потоки в общем случае выполняются параллельно(одновременно), то, как говорится, возможны варианты. В том случае, если разные потоки в процессе выполнения вступают в конфликт, например, в части доступа к системным ресурсам, в расчет принимается такой немаловажный показатель, как приоритет потока.

Приоритет потока — это число, которое определяет «важность» этого потока. Само по себе значение приоритета не принципиально, важно только, у какого потока приоритет больше. Тот поток, у которого поток выше, в случае «конфликта интересов» имеет преимущество. Поток с более высоким приоритетом может прервать выполнение менее приоритетного потока.

Работу потока, кроме окончательного и бесповоротного завершения, можно приостановить. Выполнение потока приостанавливается на определенное время, после чего выполнение потока возобновляется. Отдельной темой является синхронизация потоков.

Ведь в процессе выполнения потоки могут(и, как правило, так и происходит) обращаться к одним и тем же ресурсам. Проблемы, которые при этом возникают, намного серьезнее, чем может показаться на первый взгляд. Решаются эти проблемы как раз через процедуру синхронизации потоков.

Видео по теме:

Leave a Reply

По какой причине «синхронизированный» не разрешен в методах интерфейса Java 8?

Хотя сначала может показаться очевидным, что можно было бы поддерживать модификатор synchronized в методах по умолчанию, оказалось, что это было бы опасно и поэтому было запрещено.

Синхронизированные методы — это сокращение для метода, который ведет себя так, как если бы все тело было заключено в блок synchronized , объект блокировки которого является получателем. Может показаться разумным распространить эту семантику и на методы по умолчанию; в конце концов, это тоже методы экземпляра с получателем. (Обратите внимание, что методы synchronized — это полностью синтаксическая оптимизация; они не нужны, они просто более компактны, чем соответствующий блок synchronized . Есть разумный аргумент, что это была преждевременная синтаксическая оптимизация в первом место, и что синхронизированные методы вызывают больше проблем, чем решают, но этот корабль отплыл очень давно.)

Итак, чем они опасны? Синхронизация — это блокировка. Блокировка — это координация общего доступа к изменяемому состоянию.У каждого объекта должна быть политика синхронизации, которая определяет, какие блокировки охраняют какие переменные состояния. (См. Java Concurrency на практике, раздел 2.4.)

Многие объекты используют в качестве политики синхронизации Java Monitor Pattern (JCiP 4.1), в котором состояние объекта охраняется его внутренней блокировкой. В этом шаблоне нет ничего волшебного или особенного, но он удобен, и использование ключевого слова synchronized в методах неявно предполагает этот шаблон.

Это класс, которому принадлежит состояние, которое определяет политику синхронизации этого объекта. Но интерфейсы не владеют состоянием объектов, в которые они смешаны. Таким образом, использование синхронизированного метода в интерфейсе предполагает конкретную политику синхронизации, но такую, для которой у вас нет разумных оснований предполагать, поэтому вполне может быть, что использование синхронизации не обеспечивает никакой дополнительной безопасности потоков (возможно, вы выполняете синхронизацию с неправильной блокировкой).Это даст вам ложное чувство уверенности в том, что вы что-то сделали с безопасностью потоков, и ни одно сообщение об ошибке не сообщит вам, что вы принимаете неправильную политику синхронизации.

Уже достаточно сложно постоянно поддерживать политику синхронизации для одного исходного файла; еще сложнее гарантировать, что подкласс правильно придерживается политики синхронизации, определенной его суперклассом. Попытка сделать это между такими слабо связанными классами (интерфейсом и, возможно, множеством классов, реализующих его) будет почти невозможна и очень подвержена ошибкам.

Учитывая все эти аргументы против, какой будет аргумент? Похоже, они в основном о том, чтобы интерфейсы вели себя как черты. Хотя это вполне объяснимое желание, центром разработки методов по умолчанию является эволюция интерфейса, а не «Черты -». Мы стремились к тому, чтобы последовательно достичь двух целей, но когда одно противоречило другому, нам приходилось выбирать в пользу основной цели дизайна.

Коллекция

Java | Разница между Synchronized ArrayList и CopyOnWriteArrayList

Коллекция Java | Разница между Synchronized ArrayList и CopyOnWriteArrayList

Поскольку ArrayList не синхронизирован, если несколько потоков попытаются изменить ArrayList одновременно, то окончательный результат будет недетерминированным.Следовательно, синхронизация ArrayList необходима для обеспечения безопасности потоков в многопоточной среде.

Эту синхронизацию Arraylist можно выполнить двумя способами:

  1. Использование Collections. synchronizedList ()
  2. Использование CopyOnWriteArrayList (COWAL).

Поскольку оба используются для обеспечения безопасности потоков в Arraylist, возникает вопрос, когда использовать COWAL, а когда — Collection.synchronizedList (). Это можно понять, поняв различия между ними.Основное различие между синхронизированными ArrayList и CopyOnWriteArrayList заключается в их производительности, масштабируемости и способах обеспечения безопасности потоков.

Почему CopyOnWriteArrayList появился, когда Collection.synchronizedList () уже присутствовал

Изначально SynchronizedList использовался в многопоточной среде, но имел некоторые ограничения. Все его методы чтения и записи были синхронизированы с самим объектом списка, т.е. если поток выполняет метод add (), он блокирует другие потоки, которые хотят, чтобы итератор получил доступ к элементам в списке.Кроме того, только одному потоку разрешалось выполнять итерацию элементов списка за раз, что было неэффективно. Это было довольно жестко.

Таким образом, потребовалась более гибкая система сбора, которая позволяет:

  1. Несколько потоков, одновременно выполняющих операции чтения.
  2. Один поток выполняет операцию чтения, а другой выполняет операцию записи одновременно.
  3. Только один поток может выполнять операцию записи, в то время как другие потоки могут выполнять операции чтения одновременно.

Чтобы преодолеть эти проблемы, наконец, в Java 5 был представлен новый набор классов коллекций под названием Concurrent Collections , в котором было CopyOnWriteArrayList .Класс CopyOnWriteArrayList разработан для включения таких функций последовательной записи и одновременного чтения.

Основные различия между ними:

  1. Блокировка потоков: Синхронизированный список блокирует весь список, чтобы обеспечить синхронизацию и безопасность потоков во время операции чтения или записи, тогда как CopyOnWriteArrayList не блокирует весь список во время этих операций.

    Класс CopyOnWriteArrayList работает в соответствии со своим именем i.е. копирование при записи , которые выполняют разные действия для операций чтения и записи. Для каждой операции записи (добавления, установки, удаления и т. Д.) Он создает новую копию элементов в списке. а для операций чтения (get, iterator, listIterator и т. д.) он работает с другой копией. Таким образом, во время операции чтения нет дополнительных накладных расходов, и ее операция чтения выполняется быстрее, чем Collections.SynchronizedList (). Таким образом, COWAL лучше для операции чтения, чем синхронизированный список.

  2. Операции записи: Для операции записи в ArrayList, операции записи COWAL медленнее, чем для Коллекций.synchronizedList (), , поскольку он использует Re-entrantLock. Метод записи всегда будет создавать копию существующего массива и вносить изменения в копию, а затем, наконец, обновлять изменчивую ссылку массива, чтобы указать на этот новый массив. Следовательно, во время операции записи возникают огромные накладные расходы. Вот почему операции записи CopyOnWriteArrayList медленнее, чем Collections.synchronizedList ().
  3. Поведение во время модификации: Синхронизированный список — это отказоустойчивый итератор , т.е.е. он выбрасывает исключение ConcurrentModifcationException, когда список изменяется, когда один поток выполняет итерацию по нему, тогда как CopyOnWriteArrayList является отказоустойчивым итератором , то есть он не будет генерировать исключение ConcurrentModifcationException, даже если список изменяется, когда один поток повторяет его.
  4. Число работающих потоков: Только одному потоку разрешено работать с синхронизированным списком, блокируя полный объект списка, что влияет на его производительность, поскольку другие потоки ожидают, тогда как в случае COWAL , несколько потоков могут работать с ArrayList , поскольку он работает с отдельной клонированной копией для операций обновления / изменения, что делает его производительность быстрее .
  5. Итерация внутри блока: Во время итерации в синхронизированном списке убедитесь, что итерация выполняется внутри синхронизированного блока, тогда как в CopyOnWriteArrayList мы можем безопасно выполнять итерацию вне синхронизированного блока.
  6. Когда использовать SynchronizedList?

    1. Так как в CopyOnWriteArrayList для каждой операции обновления / изменения создается новая отдельная клонированная копия, и на JVM есть накладные расходы на выделение памяти и слияние клонированной копии с исходной копией. Таким образом, в этом случае SynchronizedList — лучший вариант.
    2. Когда размер Arraylist большой.
    3. Когда использовать CopyOnWriteArrayList?

      1. CopyOnWriteArrayList обеспечивает чтение без блокировки, что означает гораздо лучшую производительность, если имеется больше потоков чтения и запись происходит довольно низко.
      2. Когда размер Arraylist небольшой.

      SynchronizedList против CopyOnWriteArrayList

      SynchronizedList CopyOnWriteArrayList
      Он блокирует весь список для обеспечения безопасности потоков во время операций чтения и записи. Он блокирует список только во время операции записи, поэтому во время операции чтения блокировка отсутствует, поэтому несколько потоков выполняют операции чтения одновременно.
      Это итератор, работающий без сбоев. Это отказоустойчивый итератор.
      Представлен в версии Java 1.2. Представлен в версии Java 1.5.
      Итерация списка должна выполняться внутри синхронизированного блока, иначе он столкнется с недетерминированным поведением. Он может безопасно выполнять итерацию вне синхронизированного блока.
      Если какой-либо другой поток пытается изменить список, в то время как один поток выполняет итерацию этого списка, он выдаст исключение ConcurrentModificationException. Он не позволяет изменять список во время обхода, и он не будет вызывать исключение ConcurrentModificationException, если список изменяется другим потоком во время обхода.
      Лучше всего использовать, когда arraylist большой, а операции записи больше, чем операции чтения в списке. Лучше всего использовать, когда ArrayList маленький или операция чтения больше, чем операция записи.

      Вниманию читателя! Не прекращайте учиться сейчас. Получите все важные концепции Java Foundation, и Коллекции с курсом Основы Java и Коллекции Java по приемлемой для студентов цене и будьте готовы к отрасли.

(PDF) Синхронизация потоков данных в распределенных средах мультимодальной обработки сигналов в реальном времени с использованием стандартного оборудования

Рис.1. Распределенное мультимодальное приложение слияния датчиков

, оценивающее местоположение активных динамиков на основе синхронизации временных меток блоков и

.

простота конструкции. Мы используем NDFS-II не только для сбора модальных данных multi-

, но также для исследования smartspaces

, а также для объединения и обработки данных с нескольких датчиков в реальном времени. Com-

plex system исследовательские приложения, такие как распределенное моделирование

алгоритмов оптимизации муравьиных колоний (ACO) были недавно включены в спектр приложений

.

3.2. Расширенные функции и синхронизация

Функции, помимо транспортировки необработанных данных, реализованы в потоках

[8], например во время кодирования видеоисточника с использованием выбираемых кодеков

или прямого преобразования. Расширенные функции

могут быть реализованы поверх NDFS-II API. Эта стратегия дает

пользователям возможность отказаться или принять новые экспериментальные расширения

и позволяет нашей команде исследовать более сложные расширения

без изменения основных функций системы.

Фреймворк поставляется с набором потоков, основанных на наших исследовательских потребностях

. Потоки обычно создаются пользователями для передачи определенных данных порта

, полученных от их датчиков.

Однако различные типы датчиков, такие как микрофоны

и камеры, могут давать различную частоту дискретизации и данных

. Датчики, осуществляющие захват с одинаковой частотой на разных хостах

, могут иметь разную скорость передачи данных из-за различий в настройке оборудования

, локальных смещений и дрейфов тактовых импульсов, непрерывного времени ядра

, задержек планировщика или непредсказуемых операций с дисками.

Эти недостатки потока возникают при использовании операционных систем, не работающих в режиме реального времени, и обычного оборудования. Конечно, с помощью аппаратных решений

, таких как Genlock, некоторые из этих датчиков можно синхронизировать

априори. Однако существуют сценарии, в которых новые датчики

не имеют решений для аппаратной синхронизации,

или обычное оборудование используется для сбора и обработки данных

из-за финансовых ограничений. В то время как мы успешно синхронизировали

хронизированных источников разных типов, захваченных с товарным

аппаратных средств post hoc [9], необходимость специальной синхронизации

становится более очевидной при слиянии нескольких датчиков

в реальном времени и распределенной обработке сигналов. .

Для решения этой проблемы мы разработали несколько простых стратегий

специальной синхронизации программного обеспечения. Эксперименты

проводятся в операционных системах, не работающих в режиме реального времени, с использованием программного обеспечения

сгенерированных меток времени и блоков, называемых расширенными метками времени

, с использованием локальных часов, синхронизированных с центральным сервером

NTP. Однако возможно использовать временные метки, генерируемые аппаратными решениями в алгоритмах синхронизации,

, что, вероятно, обеспечит лучшую точность синхронизации.

Чтобы проиллюстрировать поток данных в распределенной среде

, давайте взглянем на две основные политики организации очередей

NDFS-II. После инициализации потока пользователь может установить политику блокировки или неблокирующей передачи данных

. Блокирующая политика

icy просто блокирует приложение, если пользователь захочет повторно получить данные

, когда их нет в очереди. Неблокирующая политика

icy никогда не блокирует приложение и позволяет потоку сбрасывать данные

в случае, если его очередь заполнена, обычно из-за недостаточной мощности обработки

.В то время как политика блокировки полезна в исследовательских сценариях

ios, где не может произойти потеря данных, неблокирующие политики

используются предпочтительно в сценариях реального времени с захватом сигнала в реальном времени

или задачах тяжелой обработки.

В первом сценарии, представленном на рис. 1, мы не предполагаем, что

отметка блока и синхронизация отметки времени (см. Блок «Датчик Fu-

»). Возникают два основных эффекта, которые определяют основные правила

конвейера обработки: во-первых, источники видео и аудиосигнала

захватывают в реальном времени с разными частотами дискретизации и передачи данных.

Секунда каждая ветвь вводит свою собственную временную задержку обработки

из-за различного времени обработки. Простое объединение этих активных мультимодальных сигналов

в конце конвейера, без отбрасывания любых данных (путем применения политики блокировки), потребует от пользователя

синхронизации сигналов вручную и из-за изменения

скорости передачи данных. Знания о частотах дискретизации источников должны использовать

. В этом случае даже без перегруженных процессорных узлов и без обработки конкретной ветви выборки

результаты слияния будут отклоняться друг от друга.

Во втором сценарии мы решаем проблемы, связанные с разными скоростями передачи данных sam-

и перегрузкой процессора, используя неблокирующую политику.

icy: теперь пользователь получает данные асинхронно. Это по-прежнему

не устраняет временную задержку обработки, вносимую различными ветвями обработки

, но позволяет удовлетворительным способом объединения данных

вместе. Объединенные сигналы больше не отдаляются друг от друга

, но может наблюдаться сдвиг с постоянной задержкой.

То есть аудиосигнал можно услышать перед соответствующей видеопоследовательностью

, потому что конвейер видео работает медленнее, поэтому

потоков

объединяются, но все еще не синхронизированы.

Наконец, давайте посмотрим на приложение синхронизации

API, разработанное на основе описанных основных политик организации очередей:

Мы увеличиваем комбинированную блочную и временную метку (BTStamp) до

сигнала в узлах захвата конвейеров (слева). на рисунке).

BTStamp генерируется инкрементным счетчиком (отметка блока

) и локальными часами (отметка времени), предполагая, что время sys-

tems было заранее синхронизировано с глобальным сервером NTP

.Временная метка позволяет смешивать сигналы датчиков

с разными скоростями передачи данных, в то время как блочная метка позволяет исходить-

данных из одного источника, разделять их между несколькими вычислительными узлами

, чтобы синхронизироваться обратно (например, на нескольких хостах или

множественных). основные процессоры). Кроме того, метка блока используется для

отслеживания пропущенных кадров. Он может найти будущее приложение

в статистических алгоритмах специальной регрессии, таких как Widrow

или фильтры Калмана, оценивающие межкадровую синхронизацию.At data fu-

KIP-353: Улучшение синхронизации временных меток потоков Kafka — Apache Kafka

Текущее состояние : Принято

Тема обсуждения : ссылка

JIRA :

КАФКА-3514

Получение сведений о проблеме …
ПОЛОЖЕНИЕ ДЕЛ


Пожалуйста, продолжайте обсуждение в списке рассылки, а не комментируйте вики (обсуждения вики быстро становятся громоздкими).

Сегодня логический вывод времени и синхронизация потоков Kafka Streams сложно понять, а также привносит много недетереминизма в порядок обработки, когда задача выбирается из нескольких входных потоков (также называемых тематическими разделами из Kafka).

Более конкретно, задача потока может содержать несколько входных тематических разделов. И, следовательно, нам нужно решить: 1) какой раздел темы выбрать следующую запись для обработки для этой задачи, 2) как будет продвигаться время потока при обработке записей.Сегодня эта логика определяется следующим образом:

a) Каждый тематический раздел поддерживает монотонно увеличивающуюся временную метку, которая зависит от временных меток буферизованных записей.

b) При обработке задачи мы выбираем головную запись из раздела, который имеет наименьшее время (выбор головной записи гарантирует упорядочение смещения внутри раздела) И имеет непустые буферизованные записи.

c) Время потока задачи определяется как наименьшее значение для всех меток времени всех его разделов и, следовательно, также монотонно увеличивается.

Вышеупомянутое поведение основано на мотивации того, что для таких операций, как объединения, мы хотим синхронизировать временные метки в нескольких входных потоках, чтобы мы могли избежать «неупорядоченного» данных в нескольких разделах тем с максимальной эффективностью. (обратите внимание, что в рамках одного раздела темы мы всегда обрабатываем данные в соответствии с порядком смещения).

Мы заметили, что вышеупомянутый механизм имеет следующую проблему:

  • Пользователи не могут контролировать компромиссы неупорядоченности / недетерминизма и задержки: мы всегда пытаемся обрабатывать задачу до тех пор, пока хотя бы один из его разделов содержит данные в буфере; это означает, что когда другие разделы в конечном итоге поставили в очередь больше данных, мы можем обнаружить, что их временные метки на самом деле меньше, чем то, что мы уже обработали, т.е.е. вышедший из строя. Во время повторной обработки, поскольку данные разделов будут доступны немедленно, мы можем выбрать записи, которые следуют порядку меток времени, и его результат не будет таким же, как при первом выполнении этой задачи, то есть недетерминированное поведение, когда мы выполняем задача несколько раз.

В этом KIP мы предлагаем улучшить эту ситуацию, чтобы позволить пользователям настраивать поведение выбора следующей записи для обработки на основе времени потока.

Во-первых, мы введем термин «обрабатываемость» задачи, как показано ниже:

  • Задача может быть обработана только тогда, когда все ее входные тематические разделы имеют буферизованные данные из выборок потребителей, следовательно, все известна текущая «позиция» метки времени его разделов тем.Таким образом, можно безопасно решить, из какого раздела выбрать следующую запись.

Чтобы решить проблему, связанную с тем, что задача может иметь многовариантный входящий трафик от ее разделов (примите во внимание: один раздел имеет 10 КБ записей в секунду, а другой — только 1 запись в секунду). Мы также позволим пользователям «принудительно» обрабатывать непроработанную задачу с помощью следующей добавленной конфигурации:

 public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms" // максимальное время настенных часов, в течение которого задача может оставаться не обработанной, хотя все еще содержит некоторые буферизованные данные хотя бы в одном из ее разделов 


Когда задача принудительно обрабатывается с помощью этой конфигурации, это потенциально может привести к неупорядоченности (т. е.е. вы можете обрабатывать запись с отметкой времени t0, в то время как время вашего потока было увеличено до t1> t0), и принудительное выполнение задачи также может быть недетерминированным и может привести к выходным данным, отличным от случая детерминированной повторной обработки. Пользователи могут использовать эту конфигурацию, чтобы контролировать, сколько они могут заплатить за задержку (т. Е. Не обрабатывать задачу), чтобы уменьшить возможность неупорядочения: если для нее установлено значение Long.MAX, мы будем ждать выполнения задачи неопределенно долго, прежде чем любые данные могут быть обработаны для этой задачи; в другом крайнем случае, когда он установлен в 0, мы всегда будем обрабатывать задачу всякий раз, когда в ней есть какие-то данные, и помним, что такая принудительная обработка может вызвать нарушение порядка.И чтобы позволить пользователям получать качественные уведомления о таких потенциальных событиях «вне порядка», мы также введем метрику на уровне задачи:

. По умолчанию мы установим для нее значение 0, чтобы соответствовать текущему поведению.

90 -136

 МЕТРИЧЕСКОЕ / АТРИБУТНОЕ ИМЯ 
 ОПИСАНИЕ 
 MBEAN NAME 
общая скорость обработки

вызовов этой задачи в секунду / общее количество принудительных вызовов процессов на данный момент.
 kafka.streams: type = stream-task-metrics, client-id = ([-. \ W] +), task-id = ([-. \ W] +) 

Обратите внимание, что с этим изменением, если пользователи предпочитают ждать бесконечно, задав для этой конфигурации значение Long.MAX, то мы можем полностью исключить неупорядоченность из-за синхронизации временных меток; но неупорядоченность по-прежнему может происходить из самих входных тем (представьте себе отдельный раздел, временные метки которого не увеличиваются монотонно, как их смещения). Одним из прямых следствий неупорядоченности является семантика оператора соединения:

Stream-Stream:

  • внутреннее соединение: вычисляет правильный результат
  • left join: может содержать «неправильный» leftRecord-null в результате
  • внешнее соединение: может содержать «неправильный» leftRecord-null или null-rightRecord в результате
  • все три варианта правильно обрабатывают записи вне порядка

Stream-Table (внутренний и левый):

  • синхронизация времени будет гарантировано установкой вышеуказанного значения конфигурации
  • записи с нарушением порядка не обрабатываются (т. е. мы не проверяем записи с нарушением порядка и просто обрабатываем все записи в порядке смещения): дает непредсказуемые результаты

Таблица -Таблица (внутренняя / левая / внешняя):

  • синхронизация времени будет гарантирована путем установки вышеуказанного значения конфигурации
  • записи вне порядка не обрабатываются (т. Е. Мы не проверяем записи вне порядка и просто обработать все записи в порядке смещения)
  • однако соединения Таблица-Таблица в конечном итоге согласованы

Наблюдаемые пользователем изменения были суммированы выше.

  • С точки зрения пользователя, это предложение содержит только дополнительную конфигурацию и метрики, поэтому оно должно быть совместимо с их существующим кодом.
  • Некоторое поведение обработки может быть изменено после того, как пользователи переопределят значения конфигурации по умолчанию, что приведет к различиям в производительности; но не должно приводить к проблемам несовместимости.

Пока нет отклоненных альтернатив.

Архитектура

— Проектный вопрос по синхронизации двух асинхронных потоков данных

Предположим, у меня есть два асинхронных потока — Поездка: {tripId, date, city} Bill: {billId, tripId, date, amount}. Мне нужно разработать систему для получения агрегированного представления в реальном времени следующего характера: City, TripCount, TotalAmount. События в обоих потоках могут быть не синхронизированы или дублироваться. Но результат должен быть точным и в реальном времени.

Мое решение:

1.) Создайте две разные таблицы БД: Trip и Bill (проиндексированные по TripID и BillID). Прочтите сообщения из потоков и сохраните их в этих таблицах со столбцом состояния как ожидающим в таблице счетов и таблице поездок. Затем напишите работника, который будет читать из таблицы счетов и искать в таблице Trip запись, содержащую данный tripID.Если запись будет найдена, она обновит агрегированное представление в третьей таблице (City, TripCount, TotalAmount). Затем изменим состояние счета и записи поездки на обработано . Будет выполняться одно фоновое задание на периодической основе, которое удалит все записи, состояние которых будет обработано как из таблицы Bill, так и из таблицы Trip.

Проблема, которую я вижу в вышеупомянутом решении, заключается в том, что индексация, выполненная на TripID и BillID, станет узким местом, если я буду удалять записи с очень высокой частотой.Помимо этого, вы, парень, видите любую другую проблему с этим решением. Я читал в Интернете, что люди предлагают это как очень известный анти-шаблон, потому что здесь я использую БД в качестве очереди.

2.) Вот другое решение: возьмите данные из потоков, сохраните их в таблицах: Поездка, Билл (для проверки записей и во избежание дублирования). Для временного хранения данных Поездки возьмите одну распределенную пару ключ-значение с очень быстрой структурой данных. Для этого я принимаю РЕДИС. Итак, после записи данных о поездке в БД, я буду писать те же данные в кеш с tripid в качестве ключа и записи в качестве значения.Затем я поставлю данные счета в очередь. Рабочие будут читать из очереди и искать tripid в кеше. Если трипид присутствует, рабочие будут читать данные из кеша и обновлять агрегированное представление и удалять трипид из кеша, а также выставлять счет из очереди. Если tripid не найден в кеше, то worker снова поместит то же сообщение в очередь.

Чтобы избежать дублирования, вставка не удастся, если мы попытаемся вставить в таблицы один и тот же tripid или billid.В случае, если вставка не удалась, я не буду помещать сообщение в очередь и кеш.

Эксперты, дайте мне знать, что вы думаете об этих двух решениях, и предложите лучшее решение, если оно у вас есть.

Что выбрать для синхронной и асинхронной связи — Redis Streams, Redis Pub / Sub, Kafka и т. Д.

Давайте поговорим об инструментах и ​​шаблонах общения. С появлением Streams в Redis у нас теперь есть еще один шаблон взаимодействия, который следует рассмотреть в дополнение к Redis Pub / Sub и другим инструментам, таким как Kafka и RabbitMQ.В этой статье я расскажу вам об определяющих характеристиках различных коммуникативных паттернов и кратко расскажу о наиболее популярных инструментах, используемых для реализации каждого из них. Наконец, я оставлю вам небольшой урок, который, надеюсь, поможет вам быстрее создавать лучшие решения.

Синхронная связь

В этом контексте синхронность означает, что все стороны должны быть активны одновременно, чтобы иметь возможность общаться. Самая простая форма — это служба A и служба B, выполняющие прямые удаленные вызовы процедур (RPC), например, путем вызова конечной точки HTTP REST службы B из службы A.Если служба B перейдет в автономный режим, служба A не сможет взаимодействовать с B, и поэтому A потребуется реализовать внутреннюю процедуру восстановления после сбоя, что в большинстве случаев означает постепенное снижение производительности. Например, в разделе «Посмотреть дальше» Netflix может отображаться случайная выборка шоу, если служба рекомендаций была недоступна.

Эти службы выполняют только постепенную деградацию, потому что для более чувствительных случаев использования (например, платежная служба, запрашивающая службу заказов для начала обработки оплаченного заказа), более распространены другие асинхронные механизмы, которые я опишу ниже. И хотя парадигма RPC хорошо работает для связи «один-к-одному», иногда вам может потребоваться поддержка «один-ко-многим» или «многие-ко-многим». На этом этапе у вас есть два основных варианта: инструменты без брокера или брокерские инструменты.

Без брокера

Без брокера означает, что участники по-прежнему подключены напрямую, но могут использовать другой шаблон, чем RPC. В этой категории у нас есть такие библиотеки, как ZeroMQ и более поздняя версия nanoMsg. Их по праву называют «сокетами TCP на стероидах».На практике вы импортируете библиотеку в свой код и используете ее для создания экземпляра соединения, которое может использовать различные встроенные механизмы маршрутизации сообщений, такие как Pub / Sub, Push / Pull, Dealer / Router и т. Д.

Посредник

Brokered означает, что участники подключаются к одной и той же службе, которая действует, как следует из названия, как центральный брокер для реализации всего механизма маршрутизации сообщений. Хотя эта архитектура обычно описывается как звездообразная, в которой брокер является центром звезды, сам брокер может быть (и часто является) кластерной системой.

Насколько мне известно, Redis Pub / Sub стоит особняком в этой категории. Вы по-прежнему можете использовать инструменты с постоянством, такие как NATS или RabbitMQ, для этого варианта использования, поскольку они позволяют отключать постоянство, но единственный брокер чисто синхронного обмена сообщениями, о котором я знаю, — это Redis. Разница не только в настойчивости, но и в общей идее надежной доставки (т. Е. Подтверждений на уровне приложения) по сравнению с запуском и забыванием. RabbitMQ по умолчанию использует прежнее поведение, в то время как Redis Pub / Sub фокусируется на выполнении минимального объема работы для запуска и забывания.Как вы понимаете, это сказывается на производительности (в конце концов, бесплатного обеда не бывает), но надежная доставка действительно применима к более широкому спектру сценариев использования.

Поскольку Streams не был доступен до Redis версии 5, некоторые люди предпочли использовать Pub / Sub в ситуациях, когда они предпочли бы более надежные гарантии доставки, и теперь переходят на него. Итак, если вы создаете новое приложение или вас не устраивает текущее, использующее Pub / Sub, подумайте о Redis Streams, если вам нужен «Pub / Sub, но с возможностью возобновления при отключении без потери сообщений.”

Плюсы и минусы

Инструменты

без брокера — это самый быстрый способ коммуникации, о котором вы только можете подумать, даже быстрее, чем Redis Pub / Sub. К сожалению, они не могут абстрагироваться от всей сложности, такой как необходимость для каждого участника знать местоположение всех остальных, чтобы подключиться к ним, или сложные сценарии сбоев, с которыми вам обычно не приходится иметь дело в брокерских системах (например, случай, когда участник умирает во время фанаута).

Прелесть использования Redis Pub / Sub в данном случае заключается в том, что не нужно отказываться от слишком большой пропускной способности и получить взамен простую, повсеместную инфраструктуру с небольшой поверхностью интеграции. Вам нужен только клиент Redis для вашего языка, и вы можете использовать PUBLISH и (P) SUBSCRIBE для перемещения сообщений.

Асинхронная связь

Конечно, асинхронность означает, что связь все еще может происходить, даже если не все участники присутствуют одновременно. Чтобы включить этот шаблон, постоянное сообщение является обязательным, в противном случае не было бы возможности гарантировать доставку в случае сбоя. Инструменты этой категории в основном состоят из решений на основе очередей или потоков.

Асинхронная связь на основе очередей

Это «традиционный» способ асинхронной связи и основа для большинства сервис-ориентированных архитектур (SOA). Идея состоит в том, что, когда сервису необходимо связаться с другим, он оставляет сообщение в центральной системе, которое другая служба получит позже. На практике эти почтовые ящики сообщений похожи на очереди задач.

Еще одно ожидание от этих систем — независимость задач друг от друга. Это означает, что они могут (и почти всегда) обрабатываться параллельно несколькими идентичными потребителями, обычно называемыми рабочими. Это свойство также обеспечивает независимый отказ, что является хорошей функцией для многих рабочих нагрузок. Например, невозможность обработать платеж от одного пользователя (может быть, из-за отсутствия информации профиля или других тривиальных проблем) не остановит весь конвейер обработки платежей для всех пользователей.

Самый известный инструмент в этой категории — RabbitMQ, за ним следует множество других инструментов и облачных сервисов, которые в основном используют AMQP (собственный протокол Rabbit) или MQTT (аналогичный открытый стандарт).Обычной практикой является использование RabbitMQ через фреймворки, которые предлагают простой способ реализации различных политик повторных попыток (например, экспоненциального отката и мертвого письма), а также подслащенный интерфейс, который делает обработку сообщений более идиоматической в ​​определенных клиентских экосистемах. Некоторые из этих фреймворков представляют собой «простые» очереди задач, такие как Sidekiq (Ruby), Celery (Python), Dramatiq (Python) и т. Д. Другие представляют собой «более серьезные» служебные шины предприятия (ESB), такие как NServiceBus (C #), MassTransit ( C #), Apache Synapse (Java) или Mulesoft (Java).

Более простая версия этого шаблона (очереди задач) также может быть реализована напрямую с помощью списков Redis. Redis имеет блокирующие и атомарные операции, которые упрощают создание индивидуальных решений. Особо следует упомянуть Kue, который использует Redis в изящной реализации очередей задач для JavaScript.

Асинхронная связь на основе потоков

Прежде всего, стоит отметить, что самый простой способ использования потоков — это просто форма хранения. Потоки представляют собой неизменяемую серию записей, ориентированных только на добавление, и многие типы данных естественным образом вписываются в этот формат.Например, показания датчиков или журналы содержат значения, которые по своей природе индексируются по времени создания и предназначены только для добавления (вы не можете изменить прошлое). У них также довольно регулярная структура (поскольку они, как правило, сохраняют тот же набор полей), свойство, которое потоки могут использовать для большей эффективности использования пространства. Этот тип данных хорошо вписывается в поток, потому что самый прямой способ доступа к данным — это получение заданного временного диапазона, что потоки могут делать очень эффективно.

Возвращаясь к нашему варианту использования связи, все реализации потоков также позволяют клиентам отслеживать поток, получая обновления в реальном времени по мере добавления новых записей.Иногда это называют наблюдением за потоком или подпиской на него. Самый простой способ использовать Streams в качестве инструмента связи — отправить в поток то, что вы в противном случае опубликовали бы через Pub / Sub, в основном создавая возобновляемый Pub / Sub. Каждому подписчику просто нужно запомнить последний обработанный идентификатор записи, чтобы его можно было легко возобновить в случае сбоя или отключения.

Также возможно — а иногда и предпочтительнее — реализовать связь между сервисами через потоки, входя в область потоковых архитектур .Основная идея здесь заключается в том, что то, что мы ранее описывали как задачи / сообщения, теперь будет событием. При проектировании на основе очередей задачи помещаются в очередь службы другой службой, которая хочет, чтобы она что-то сделала, но в потоковой архитектуре происходит обратное: каждая служба отправляет обновления состояния в свой собственный поток, что, в свою очередь, наблюдается другие услуги.

Это изменение дизайна имеет множество тонких последствий. Например, вы можете добавить новые службы позже и заставить их просматривать всю историю потоков.В очередях это невозможно, потому что задачи удаляются после завершения, и способ, которым общение обычно выражается в этих системах, теперь позволяет это (подумайте, императивное или функциональное).

Потоки имеют двойную природу: структуру данных и схему связи. Некоторые данные естественно вписываются в это (например, журналы), и обмен данными между сервисами не обязательно должен основываться на очередях задач. Практика полного принятия этой двойственной природы называется поиском событий.

С помощью источника событий вы определяете свои бизнес-модели как бесконечный поток событий и позволяете бизнес-логике и другим службам реагировать на него.Сделать такой перевод не всегда легко, но преимущества могут быть огромными при решении сложных вопросов, таких как «каково было состояние объекта X в момент Y?», На которые в противном случае было бы очень сложно ответить или прямо невозможно без надлежащего аудита. протоколирование.

Может быть, вашему обычному мобильному приложению не нужен источник событий, но для корпоративного программного обеспечения, которое имеет дело с личными данными клиентов, доставкой и другими «беспорядочными» доменами, это может оказаться большим подспорьем.

Для реализации такого рода шаблонов вы можете использовать множество инструментов. Конечно, мы должны начать со слона в комнате: Apache Kafka, а также альтернатив, таких как Apache Pulsar (от Yahoo) и повторных реализаций Kafka на других языках, а также нескольких предложений SaaS. Наконец, есть новинка: Redis Streams.

Apache Kafka против Redis Streams

Прежде всего, обратите внимание, что то, что Redis называет «потоком», Kafka называет «разделом темы», а в Kafka потоки — это совершенно другая концепция, которая вращается вокруг обработки содержимого темы Kafka.

Тем не менее, с точки зрения выразительности, обе системы эквивалентны: вы можете реализовать одно и то же приложение на любой из них без каких-либо существенных изменений в том, как вы моделируете свои данные. Различия начинаются, когда вы погружаетесь в практические детали, а их много и они существенны.

Kafka существует уже давно, и люди успешно создали надежные потоковые архитектуры, в которых он является единственным источником истины. Однако, если вы готовы опробовать новую технологию, цените простоту как в разработке, так и в эксплуатации, и вам нужна задержка менее миллисекунды, тогда Redis Streams может заполнить очень похожее место в вашей архитектуре.

Когда я говорю «простота», я имею в виду именно это. Если вы никогда не пробовали Redis Streams, даже если вы планируете использовать Kafka в производственной среде, я предлагаю вам попробовать создать прототип своего приложения с помощью Redis Streams, так как буквально пара минут встает и запускается на вашем ноутбуке.

Варианты использования для каждого шаблона

Давайте рассмотрим несколько примеров, чтобы увидеть, какие проблемы лучше всего решает каждый шаблон.

Синхронная связь без брокера

Если вы пытаетесь создать пару клиентских устройств (например,g., телефон, Arduino) разговаривают по локальной сети друг с другом или с программой, работающей на компьютере, кратчайший путь к рабочему решению — это, вероятно, «TCP-соединение на стероидах».

Синхронная связь через посредник

Чат-приложение в стиле IRC (т. Е. Без истории) или конвейер обработки в реальном времени изменчивых журналов / событий в режиме plug-and-play хорошо работают с брокерским подходом. Бенджамин Сержант рассказал об этом последнем примере использования на RedisConf19 в Сан-Франциско (слайды).

Асинхронная связь на основе очередей

Поисковые роботы

очень часто полагаются на этот шаблон, а также на многие веб-службы с операциями, которые не могут быть выполнены немедленно в ответ на запрос.Подумайте, например, о кодировании видео в YouTube.

Асинхронная связь на основе потоков

Этот метод лучше всего подходит для обработки журналов, устройств и микросервисов Интернета вещей (IoT) в дополнение к чат-приложениям в стиле Slack (т. Е. С историей).

Redis против мира

Перед тем, как завершить, я хочу оставить вас с одним последним соображением. Настоящая сверхмощность Redis заключается в том, что это не , а просто , система обмена сообщениями Pub / Sub, очередь или потоковая служба.Это также не просто база данных общего назначения. На самом деле, проявив достаточно настойчивости, вы могли бы реализовать каждый описанный выше шаблон поверх реляционной СУБД, но есть практические причины, по которым это было бы плохой идеей.

Redis предлагает настоящую систему «запустил и забыл» Pub / Sub, а также настоящий Stream типа данных. Кроме того, с модулями Redis Redis также поддерживает реальные реализации многих различных типов данных. Позвольте мне сопоставить это утверждение с нашими вариантами использования постоянных и непостоянных приложений чата.

Приложение чата в IRC-стиле

Выше я пришел к выводу, что Pub / Sub был бы правильным выбором, поскольку этот тип чат-приложения должен только отправлять сообщения подключенным клиентам. Однако для реализации даже простой версии этого приложения вам все равно придется подумать о глобальном списке каналов и списке присутствия пользователей для каждого канала. Где вы храните это состояние? Как поддерживать его в актуальном состоянии, особенно если экземпляр службы неожиданно умирает? В Redis ответ прост: отсортированные наборы, ключи с истекающим сроком действия и атомарные операции.Если бы вы использовали RabbitMQ, вам потребовалась бы СУБД.

Чат в стиле Slack

Беседы могут быть очень естественно выражены как поток сообщений. Нам даже не нужно добавлять сюда источники событий, так как это уже собственная структура этого типа данных. Так почему Redis вместо Kafka в этом примере? Как и раньше, ваша система чата не будет просто потоком сообщений. Есть каналы и другие состояния, которые лучше всего представлены по-разному. Также есть функция «Пользователь X печатает…»: эта информация непостоянна, вы хотите отправить ее всем участникам, но только когда они подключены.Если бы вы использовали Kafka, вам бы все равно пришлось развернуть систему Pub / Sub.

На вынос

В распределенных системах, когда вам нужна координация, вам часто требуется разделяемое состояние, и наоборот. Игнорирование этого факта довольно часто может привести к слишком сложным решениям. Redis прекрасно это понимает, и это одна из причин его уникального дизайна. Если вы примете этот принцип, вы обнаружите, что сложные проблемы иногда можно решить с помощью нескольких команд при наличии правильных примитивов, и это именно то, что Redis дает вам.Например, так вы можете транзакционным способом добавить запись в поток, поместить задачу в (начало) очереди и опубликовать в Pub / Sub:

MULTI
XADD журналы: service1 * level error req-id 42 stack-trace "..."
LPUSH actions-queue "RESTART = service1"
PUBLISH live-notifs "Новое событие ошибки в service1!"
EXEC

Чтобы найти рабочее решение, вам нужно победить семь злобных проблем параллелизма.
Сюжет о Redis Pilgrim vs.Мир

Надеюсь, это даст вам понимание основных шаблонов коммуникации, которые обычно используются в распределенных системах. В следующий раз, когда вам понадобится соединить две службы вместе, это должно помочь вам сориентироваться в вариантах. Если вам нравится говорить о TCP-соединениях на стероидах и потоковых архитектурах, не стесняйтесь обращаться ко мне в Twitter @croloris.

Тип данных Redis Streams — отличная особенность Redis, он станет строительным блоком для многих приложений, особенно теперь, когда Redis имеет пул модулей, которые добавляют новые полноценные возможности для временных рядов, графиков и поиска.Чтобы узнать больше о Redis Streams, ознакомьтесь с этим вводным сообщением в блоге Antirez, а также с официальной документацией. Но не забывайте, что потоки не являются подходящим инструментом для на каждое задание : иногда вам нужны Pub / Sub или просто скромные операции блокировки в списках Redis (или в сортированных наборах, у Redis это тоже есть).

10 вопросов, на которые вы должны получить ответы

Реактивное программирование (RP) не является чем-то новым и передовым, когда дело доходит до разработки приложений.Скорее всего, вы уже слышали об этом. Этот термин был впервые введен в мир информационных технологий в 1960-х годах, и с тех пор о нем много говорилось и писалось. К сожалению, как это часто бывает, новая концепция быстро породила вокруг себя множество неверных толкований, и продолжает это делать по сей день. Реактивный манифест 2014 года, который представил «реактивные системы» и их четыре «священных принципа», еще больше испортил все. Итак, давайте попробуем прояснить, что есть что, и понять, почему и где нам нужно реактивное программирование при разработке приложений Java (если это действительно так).

# 1 Что такое реактивное программирование в двух словах?

Реактивное программирование — это парадигма программирования, которая имеет дело с асинхронными потоками данных (последовательностями событий) и конкретным распространением изменений, что означает, что она реализует модификации среды выполнения (контекста) в определенном порядке.

# 2 Что это за «конкретное распространение изменений»?

Вот пример из жизни. Скажем, сегодня пятница, и Джон хочет провести этот вечер со своим другом Бобом, готовя пиццу и просматривая один из эпизодов «Звездных войн».Обрисуем варианты, которые у него есть.

  1. Джон заканчивает свою работу. Потом идет, заказывает пиццу и ждет, пока она не будет готова. Затем забирает своего друга. И наконец (с Бобом и пиццей) добирается до дома и приступает к фильму. Это будет подход sync , и он будет слишком длинным, так что, вероятно, Джон захотел бы отменить этот подход к тому времени.
  2. Джон заказывает пиццу онлайн, звонит Бобу, приглашает его прийти. Он идет домой, получает пиццу и начинает смотреть фильм (и ест пиццу), не дожидаясь появления Боба.Вот что может случиться с подходом async .
  3. Джон заказывает пиццу, звонит Бобу, приглашает его прийти, идет домой и получает пиццу. Но на этот раз он ждет, пока не придет Боб, и только после этого включает фильм. Это и есть реактивный подход . Вы ждете, пока все асинхронные действия (изменения) будут завершены, а затем переходите к дальнейшим действиям.
# 3 Реактивное программирование и реактивные системы — это одно и то же?

Нет, это не так.Хотя эти термины часто используются как синонимы, они не являются синонимами и отражают разные вещи.

Реактивные системы представляют собой следующий уровень «реактивности». Этот уровень подразумевает конкретные решения design, и , архитектурные , которые позволяют создавать устойчивые, гибкие и отзывчивые приложения.

Вам не обязательно использовать реактивное программирование в реактивных системах, но это хорошая идея, так как комбинация приносит еще больше преимуществ вашему приложению, поскольку делает их еще более слабосвязанными, позволяет более эффективно использовать ресурсы, они более отзывчивы и обеспечивают меньшую задержку.

# 4 Зачем нам нужна «реактивность» в Java?

Когда дело доходит до огромных объемов данных или многопользовательского режима , нам часто требуется асинхронная обработка, чтобы сделать наши системы быстрыми и отзывчивыми. В Java, представляющей старое объектно-ориентированное программирование, асинхронность может стать действительно неприятной и затруднить понимание и поддержку кода. Таким образом, реактивное программирование особенно полезно для этой «чисто» объектно-ориентированной среды, поскольку оно упрощает работу с асинхронными потоками.

# 5 Как перейти в реактивную среду на Java?

В своих последних выпусках (начиная с Java 8) сама Java предприняла несколько попыток ввести встроенную реактивность , но до сих пор эти попытки не очень популярны среди разработчиков. Но есть несколько действующих и регулярно обновляемых сторонних реализаций для реактивного программирования на Java, которые помогают сэкономить время и поэтому особенно любимы и лелеют Java-разработчиками.

RxJava был первым API реактивного расширения, специфичным для платформы Java.Он работает с Java 6 и дает возможность писать асинхронные программы, основанные на событиях, как для Java, так и для Android Java, что очень удобно.

Spring Reactor — еще один фреймворк для Java от разработчиков Spring. Он очень похож на RxJava, но имеет более простую абстракцию. Фреймворку удалось завоевать популярность благодаря возможности использовать преимущества Java 8.

# 6 Что я получаю с RP в реальной жизни?

Повышенная производительность — благодаря возможности быстро и стабильно обрабатывать огромные объемы данных.

Улучшенный UX — благодаря возможности сделать приложение более отзывчивым к своему пользователю.

Упрощенные модификации и обновления — за счет более читаемого и легкого предсказуемого кода.

# 7 Должен ли я сделать все это реактивным?

«Реактивные» компоненты могут быть беспрепятственно введены в приложение, как его часть, поэтому нет необходимости изменять всю модель программирования проекта, саботировать другие проверенные стили программирования, полностью придерживаться «реактивности» и вносить ненужные сложности.Мера — это сокровище. Например, если это простой веб-сайт, вряд ли нужно писать его реактивным. Но как только вы захотите обновить его и ввести систему рекомендаций, реактивный код будет хорошей идеей для работы с большой нагрузкой больших данных.

# 8 Когда использовать RP?

Переход на реактивность обеспечивает элегантное решение, когда дело доходит до определенных типов многопользовательских приложений с высокой нагрузкой или :

  • Социальные сети, чаты
  • Игры
  • Аудио и видео приложения

И к следующим компонентам любого типа приложения :

  • Серверный код, обслуживающий интерактивные элементы пользовательского интерфейса
  • Прокси-серверы, балансировщики нагрузки
  • Искусственный интеллект, машинное обучение
  • Потоковая передача данных в реальном времени

Хотите увидеть пример из жизни? Посмотрите наш недавний проект Java, в котором мы применили реактивное программирование.

Посмотреть проект

# 9 Когда не использовать RP?

Проще говоря, не пытайтесь обращаться к RP там, где в этом нет необходимости, например где нет «живых» данных, высокая нагрузка или большое количество пользователей, которые меняют данные одновременно.

# 10 Что нужно, чтобы начать реактивное программирование на Java?

К сожалению, простое изучение теории и загрузка фреймворков не окажут большого влияния.Java-разработчику требуется немало времени, усилий и практического опыта, чтобы заставить повелительный ум привыкнуть к другому уровню абстракции. Вот почему, если вы решите принять «реактивность», лучше обратиться к хорошему консультанту с правильным пониманием подхода, который сможет определить, где реактивные компоненты будут полезны для вашей системы и как вы можете легко ввести их в нем.

Ключ

«Реактивное программирование» больше не модное слово, но все еще четко не определено.Мы надеемся, что в следующий раз, когда вы столкнетесь с этим термином, вы не будете сбиты с толку, поскольку теперь вы знаете, что это всего лишь еще один стиль кодирования, в основе которого лежит эффективное управление изменениями с помощью асинхронных потоков данных. RP имеет особое значение для объектно-ориентированной Java, где асинхронность часто приводит к тому, что код трудно понять и поддерживать. RP может быть сложной задачей, так как требует от разработчика Java осознания совершенно нового стиля программирования. Однако в случае успеха усилия не будут потрачены зря, поскольку это позволяет приложениям быть более устойчивыми к высокой нагрузке и значительно улучшает UX, делая их более отзывчивыми.Тем не менее, также важно не реагировать на слишком остро. Бездумное использование реактивного подхода без реальной необходимости просто испортит приложение из-за ненужной сложности.

Я ХОЧУ УЗНАТЬ БОЛЬШЕ

Java-разработка и консалтинг от ScienceSoft

Планируете создать проект на основе Java? Воспользуйтесь опытом наших преданных своему делу архитекторов и разработчиков, чтобы раскрыть весь потенциал среды Java.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *