Разное

Потоки python 3: Введение в потоки в Python

Содержание

Многопоточность в Python

Многопоточность — это основная концепция программирования, которую поддерживают почти все языки программирования высокого уровня. В этом уроке по многопоточности в Python мы рассмотрим различные методы создания потоков и реализации синхронизации.

Давайте сначала узнаем, что такое поток и что означает многопоточность в компьютерных науках.

Что такое Thread в информатике?

В программировании поток — это наименьшая единица выполнения с независимым набором инструкций. Он является частью процесса и работает в таких же исполняемых ресурсах программы совместного использования контекста, как память. Поток имеет начальную точку, последовательность выполнения и результат. Он имеет указатель инструкций, который хранит текущее состояние потока и контролирует, что будет выполнено в следующем порядке.

Что такое Multithreading в информатике?

Способность процесса выполнять несколько потоков параллельно называется многопоточностью. В идеале многопоточность может значительно улучшить производительность любой программы. И механизм многопоточности Python довольно удобен для пользователя, который вы можете быстро освоить.

Плюсы многопоточности:
  • Multithreading может значительно повысить скорость вычислений в многопроцессорных или многоядерных системах, поскольку каждый процессор или ядро обрабатывает отдельный поток одновременно.
  • Multithreading позволяет программе оставаться отзывчивой, пока один поток ожидает ввода, а другой одновременно запускает графический интерфейс. Это утверждение справедливо как для многопроцессорных, так и для однопроцессорных систем.
  • Все потоки процесса имеют доступ к его глобальным переменным. Если глобальная переменная изменяется в одном потоке, она видна и другим потокам. Поток также может иметь свои собственные локальные переменные.
Минусы многопоточности:
  • В однопроцессорной системе многопоточность не влияет на скорость вычислений. Фактически производительность системы может снизиться из-за накладных расходов на управление потоками.
  • Синхронизация необходима, чтобы избежать взаимного исключения при доступе к общим ресурсам процесса. Это напрямую ведет к увеличению использования памяти и процессора.
  • Многопоточность увеличивает сложность программы, что также затрудняет отладку.
    Это повышает вероятность потенциальных ошибок.
  • Это может вызвать голод, когда поток не получает регулярный доступ к общим ресурсам. Тогда он не сможет возобновить свою работу.

Python модули

Python предлагает два модуля для реализации потоков в программах.

Для вашей информации, модуль thread устарел в Python 3 и переименован в модуль _thread для обратной совместимости. Но мы объясним оба метода, потому что многие пользователи все еще используют устаревшие версии Python.

Основное различие между этими двумя модулями состоит в том, что модуль реализует поток как функцию. С другой стороны, модуль предлагает объектно-ориентированный подход, позволяющий создавать потоки.

Как использовать модуль Thread для создания потоков?

Если вы решили применить модуль thread в своей программе, используйте следующий метод для создания потоков.

thread.start_new_thread ( function, args[, kwargs] )

Этот метод является довольно простым и эффективным способом создания потоков. Вы можете использовать его для запуска программ в Linux и Windows.

Этот метод запускает новый поток и возвращает его идентификатор. Он вызовет функцию, указанную в качестве параметра «function» с переданным списком аргументов. Когда возвращается функция, поток молча завершает работу.

Здесь args — это кортеж аргументов; используйте пустой кортеж для вызова без каких-либо аргументов. Необязательный аргумент указывает словарь аргументов с ключевыми словами.

Если функция завершается с необработанным исключением, выводится трассировка стека, а затем поток выходит (это не влияет на другие потоки, они продолжают работать). Используйте приведенный ниже код, чтобы узнать больше о многопоточности.

Базовый пример Multithreading Python

# Пример многопоточности Python.
# 1. Рассчитать факториал с помощью рекурсии.
# 2. Вызовите факториальную функцию, используя поток.

from thread import start_new_thread

threadId = 1

def factorial(n):
  global threadId
  if n 

Вы можете запустить приведенный выше код в своем локальном терминале или использовать любой онлайн-терминал. Как только вы запустите эту программу, она выдаст следующее.

Waiting for threads to return...
Thread: 1
1! = 1
2! = 2
3! = 6
4! = 24
Thread: 2
1! = 1
2! = 2
3! = 6
4! = 24
5! = 120

Как использовать модуль Threading для создания потоков?

Последний модуль threading предоставляет богатые возможности и большую поддержку потоков, чем устаревший модуль thread, описанный в предыдущем разделе. Модуль threading является отличным примером многопоточности Python.

Модуль объединяет все методы модуля thread и предоставляет несколько дополнительных методов.

  • threading.activeCount(): находит общее число активных объектов потока.
  • threading.currentThread(): его можно использовать для определения количества объектов потока в элементе управления потоком вызывающей стороны.
  • threading.enumerate(): он предоставит вам полный список объектов потока, которые в данный момент активны.

Помимо описанных выше методов, модуль также представляет класс Thread, который вы можете попробовать реализовать в потоках. Это объектно-ориентированный вариант многопоточности Python.

Класс имеет следующие методы.

Методы класса Описание метода
run(): Это функция точки входа для любого потока.
start(): запускает поток при вызове метода run.
join([time]): позволяет программе ожидать завершения потоков.
isAlive(): проверяет активный поток.
getName(): извлекает имя потока.
setName(): обновляет имя потока.

При желании вы можете обратиться к родной документации Python, чтобы глубже изучить функциональность модуля threading.

Шаги для реализации потоков с помощью модуля Threading

Вы можете выполнить следующие шаги для создания нового потока с помощью модуля .

  • Создайте класс наследовав его от Thread.
  • Переопределите метод __ init __ (self [, args]) для предоставления аргументов в соответствии с требованиями.
  • Затем переопределите метод run(self [, args]), чтобы создать бизнес-логику потока.

Как только вы определили новый подкласс Thread, вы должны создать его экземпляр, чтобы начать новый поток. Затем вызовите метод для его запуска. В конечном итоге он вызовет метод для выполнения бизнес-логики.

Пример — создание класса потока для печати даты

# Пример многопоточности Python для печати текущей даты.
# 1. Определите подкласс, используя класс Thread.
# 2. Создайте подкласс и запустите поток.

import threading
import datetime

class myThread (threading.Thread):
   def __init__(self, name, counter):
       threading.Thread.__init__(self)
       self.threadID = counter
       self.name = name
       self.counter = counter
   def run(self):
       print("Starting " + self.name)
       print_date(self.name, self.counter)
       print("Exiting " + self.name)

def print_date(threadName, counter):
   datefields = []
   today = datetime.date.today()
   datefields.append(today)
   print(
      "%s[%d]: %s" % ( threadName, counter, datefields[0] )
   )

# Создать треды
thread1 = myThread("Thread", 1)
thread2 = myThread("Thread", 2)

# Запустить треды
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print("Exiting the Program!!!")

Python Multithreading — синхронизация потоков

Модуль имеет встроенную функциональность для реализации блокировки, которая позволяет синхронизировать потоки. Блокировка необходима для контроля доступа к общим ресурсам для предотвращения повреждения или пропущенных данных.

Вы можете вызвать метод Lock(), чтобы применить блокировки, он возвращает новый объект блокировки. Затем вы можете вызвать метод захвата (блокировки) объекта блокировки, чтобы заставить потоки работать синхронно.

Необязательный параметр блокировки указывает, ожидает ли поток получения блокировки.

  • В случае, если блокировка установлена на ноль, поток немедленно возвращается с нулевым значением, если блокировка не может быть получена, и 1, если блокировка получена.
  • В случае, если для блокировки задано значение 1, поток блокируется и ожидает снятия блокировки.

Метод release() объекта блокировки используется для снятия блокировки, когда она больше не требуется.

Просто для вашей информации, встроенные в Python структуры данных, такие как списки, словари, являются поточно-ориентированными, что является побочным эффектом наличия атомарных байт-кодов для управления ими. Другие структуры данных, реализованные в Python или базовые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту мы используем объект Lock.

Пример блокировки в многопоточности

Пример многопоточности Python для демонстрации блокировки. 
# 1. Определите подкласс, используя класс Thread. 
# 2. Создайте подкласс и запустите поток. 
# 3. Реализуйте блокировки в методе выполнения потока.

import threading
import datetimeexit

Flag = 0

class myThread (threading.Thread):
   def __init__(self, name, counter):
       threading.Thread.__init__(self)
       self.threadID = counter
       self.name = name
       self.counter = counter

   def run(self):
       print("Starting " + self.name)

       # Получить блокировку для синхронизации потока
       threadLock.acquire()
       print_date(self.name, self.counter)

       # Снять блокировку для следующего потока
       threadLock.release()
       print("Exiting " + self.name)

def print_date(threadName, counter):
   datefields = []
   today = datetime.date.today()
   datefields.append(today)
   print(
      "%s[%d]: %s" % ( threadName, counter, datefields[0] )
   )

threadLock = threading.Lock()
threads = []

# создать треды
thread1 = myThread("Thread", 1)
thread2 = myThread("Thread", 2)

# Запустить треды
thread1.start()
thread2.start()

# Добавить треды в список
threads.append(thread1)
threads.append(thread2)

# Дождитесь завершения всех потоков
for t in threads:
   t.join()

print "Exiting the Program!!!"

Подведем итоги — многопоточность Python для начинающих

Мы хотели бы, чтобы этот урок Python Multithreading был очень интересным и интуитивно понятным. Кроме того, мы добавили в этой статье несколько иллюстраций, которые, несомненно, помогут вам улучшить свои навыки работы с Python.

Мы также рекомендуем вам потренироваться с новыми многопоточными заданиями для получения лучших результатов, которые повысят ваши знания и навыки программирования на Python.

Основы работы с потоками в языке Python / Хабр

Предисловие

Данную статью я затеял написать после учащающихся вопросов как на форуме так и вопросов в icq на тему многопоточности в CPython. Проблема людей, которые их задают происходит, в основном, из незнания или непонимания основных принципов работы многопоточных приложений. По крайней мере, это относится к используемой мной модели многопоточности, которая носит название Thread Pool (Пул потоков). Часто встречаемой проблемой является и другое: люди не имеют элементарных навыков работы со стандартными модулями CPython. В статья я постараюсь привести примеры такого незнания, не останавливаясь на личностях, так как это по моему скромному мнению неважно. Исходя из условий, в которых пишется эта статья, то мы немного затронем и работу через proxy серверы (не путать с SOCKS).

Детали


Статья писалась в то время когда последними версиями CPython были: 2.6.2 для второй ветки и 3.1.1 для третьей ветки. В статье используются новоявленные в CPython 2.6 with statements, следовательно при желании ипользования этого кода в более ранних версиях придется переделывать код на своё усмотрение. В процессе написания данной статьи использовались только стандартные модули, доступные «из коробки». Также, исходя из того, что я являюсь не профессиональным программистом, а самоучкой, то прошу извинения у уважаемой аудитории за возможные неточности относительно трактования тех или иных понятий. Поэтому, приглашаю вас задавать вопросы, на которые я по возможности буду отвечать.

Итак, приступим, фактически, то что я собрался описывать впервые было порекомендовано уважаемым lorien с python.su (правда в его примере Queue вообще в отдельном потоке обрабатывалось :)), не уверен что он автор продемонстрированного им концепта, но впервые я увидел это опубликованным именно от него, и являет собой даже скорее не Thread Pool, а Task Pool (хотя возможно я и не прав в трактовании сего термина).
Что представляет собой многопоточное приложение? Это приложение, в котором определенное количество потоков выполняют некие задачи. Беда многих в том, что они не до конца улавливают то, что потоки действуют отдельно друг от друга до тех пор, пока активен главный поток. Лично я стараюсь писать таким образом, чтобы это им не мешало, но об этом позже. Также их проблемой является так называемый «индусский» код, который просто и бездумно откуда-то копируется, а программа доводится до уровня «лишь бы работало». Господа, усвойте раз и навсегда: если вы не понимаете, как работает тот или иной участок вашей программы, то перепишите его так, чтобы это было понятно ВАМ, если в будущем вы дорастете до понимания тех вещей, которые вы предполагали бездумно скопировать, то вам без проблем можно будет использовать этот код. Главным является именно ВАШЕ понимание того, как работает ваше творение.

Затронем проблему отдельной работы потоков. Господа, взаимодействие потоков стоит продумывать до того как вы начинаете писать приложение, а не когда вы его уже написали. В принципе, если придерживаться некоторых правил работы с исходным кодом приложения, то переделывание программы из однопоточной в многопоточную происходит легко, безболезненно, и быстро.

Касательно активности главного потока. Когда, как вам кажется, вы запускаете ОДИН поток, фактически работает уже ДВА потока. Нужно понимать, что количество потоков, активных в данный момент равняется количеству потоков, запущенных в данный момент вами +1 поток, в котором работает основное тело приложения. Лично я стараюсь писать таким образом, чтобы четко отделять основной поток от запущенных мной. Если этого не делать, то возможно преждевременное (как вам кажется) завершение работы приложения, хотя на самом деле приложение отработает именно так, как вы его написали.

Вроде на словах понятно, теперь приступаем к практике. На практике в CPython есть такое понятние как GIL (Global Interpreter Lock). Под сим подразумевается глобальная блокировка интерпритатора в тот момент когда потоки вашего приложения обращаются к процессору. Фактически, в каждый отдельно взятый момент с процессором работает только один поток. В связи с этим максимальное количество потоков, которое вообще можно запустить в стандартном CPython колеблется в районе 350 штук.

В качестве примера будет сделана попытка реализовать многопоточный парсер www.google.com. Как я уже написал выше, для работы будут использованы исключительно стандартные модули, для выполнения задачи понадобятся модули urllib2, urllib, queue, threading, re.

По порядку:

#==================<Имортирование необходимых модулей>==================
import urllib2
#Модуль для работы с протоколом HTTP, высокоуровневый
import urllib
#Модуль для работы с протоколом HTTP, более низкоуровневый чем urllib2, 
#фактически из него необходима одна функция - urllib.urlquote
from Queue import Queue
#Модуль, который представляет собой "Pool", фактически это список, в 
#котором на нужных местах вставлены замки таким образом, чтобы к нему 
#одновременно мог обращаться только один поток
import threading
#Модуль для работы с потоками, из него понадобится только 
#threading.active_count, threading.Thread, threading.Thread.start, 
#threading.Rlock
import re
#Модуль для работы с регулярными выражениями, его использование выходит
#за пределы статьи
import time 
#Модуль для работы со временем, из него нужна только функция sleep

queue = Queue()
#Обязательное присваивание, нужно делать именно так (т.е. импортировать
#класс Queue из модуля Queue и инициализировать его)
#==================</Имортирование необходимых модулей>=================

#==============================<Настройки>==============================

PROXY = «10.10.31.103:3128»
#Во время написания статьи сижу за прокси-сервером, поэтому в статье 
#затрагивается и этот вопрос, этой строкой обьявляется глобальная
#переменная PROXY, в которой находится адрес прокси-сервера. Для работы 
#напрямую необходимо указать значение None

HEADERS = {«User-Agent» : «Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1»,

           «Accept» : «text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1»,

           «Accept-Language» : «ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7»,

           «Accept-Charset» : «iso-8859-1, utf-8, utf-16, *;q=0.1»,

           «Accept-Encoding» : «identity, *;q=0»,

           «Connection» : «Keep-Alive»}
#Для того чтобы получить страницу с www.google.com НЕОБХОДИМО использовать
#заголовки браузера, они представлены выше в ассоциативном массиве HEADERS, 
#соответствуют реальным заголовкам браузера Opera с маленько модификацией, эти 
#заголовки означают что клиент не может принимать zlib compressed data, т.е. 
#сжатые данные — не хотел я заморачиваться еще и с разархивироанием страниц, тем 
#более что не все сайты их сжимают…

THREADS_COUNT = 10
#В принципе это все настройки приложения, это-количество потоков

DEEP = 30
#Это — значение, которое отвечает за глубину страниц поиска, которые 
#нужно просматривать, фактически же определяет собой количество ссылок, 
#которые будут собраны сборщиком.

ENCODING = «UTF-8»
#Кодировка ваших файлов (для загрузки данных из файла с запросами и 
#последующего их перевода в юникод)
#==============================</Настройки>===================================

LOCK = threading.RLock()
# Вот тут то впервые и затрагивается модуль threading
#создается обьект LOCK, который представляет собой класс threading.RLock из
#модуля threading, это -простейший замок, который запрещает исполнение 
#несколькими потоками участка кода который идет после вызова его метода 
#acquire() Основным отличием threading.RLock от threading.Lock (тоже класс из 
#модуля threading) является то, что каждый поток может обращаться к обьекту 
#threading.RLock неограниченное количество раз, обьект threading.Lock может 
#вызываться каждым потоком только единожды.

Основным принципом для беспроблемной реализации многопоточности я считаю модульность кода. Не обязательно выносить используемые Вами функции в отдельные файлы или классы, достаточно чтобы хотя бы это были отдельные функции (Прошу простить за каламбур).

Сейчас я выделю работу потока в отдельную функцию, пускай она будет представлять собой некое абстрактное понятие работы. На данном этапе пока что некоторые моменты будут непонятны, но позже все это выстроится в понятный алгоритм.

def worker():
# Обьявление функции worker, входных аргументов нет

    global queue

    #Здесь и далее я буду обьявлять функции из глобального пространства 

    #имен в локальном для лучшей читабельности кода, хотя в написании

    #софта такое делать строго не рекомендую (!)

    while True:

    #Запуск бесконечного цикла, в котором будет происходить работа

        try:

        #Обработка ошибок, блок try/except, когда обработается

        #ошибка QueueEmpty это значит, что список задач пуст, и поток 

        #должен завершить свою работу

            target_link =  queue.get_nowait() 

            #Эта строчка олицетворяет собой получение задачи потоком из

            #списка задач queue

        except Exception, error:

        #сам перехват ошибки

            return

            #Завершение работы функции

        parsed_data = get_and_parse_page(target_link)

        #Позже будет реализована функция, которая будет получать 

        #страницу и доставать из нее необходимые значения

        if parsed_data != "ERROR":

        #Проверка на то, была ли получена страница

            write_to_file(parsed_data)

            #Также будет реализована функция для записи собранных данных в файл

        else:

            queue.put(target_link)

            #Если страница не была получена, то забрасываем ее обратно в queue




Главное, что нужно четко усвоить — это алгоритм работы самого потока, и что именно потоки должны обрабатывать независимо друг от друга. Итого, задачи потока очень просты — получить ссылку на страницу поиска, передать ее в функцию-обработчик, из которой вернутся ссылки на найденные сайты а также title этих сайтов, после записать ссылки и title в файл (все это будет находиться в parsed_data).

По мере работы реализовываются задачи от простейшей к сложной, поэтому на этих этапах практически не затронуты вопросы многопоточности. Далее настала очередь реализации функции, которая будет отвечать за запись в файл.

def write_to_file(parsed_data):
#Обявление функции write_to_file, аргумент –массив данных для записи

    global LOCK

    global ENCODING

    LOCK.acquire()

    #"Накидывание замка", следующий далее участок кода может выполнятся

    #только одним потоком в один и тот же момент времени

    with open("parsed_data.txt", "a") as out:

    #Используется with statement, открывается файл parsed_data.txt с

    #правами "a", что означает дозапись в конец файла, и присваиваевается

    #хэндлеру на файл имя out (я так привык)

        for site in parsed_data:

        #Проход циклом по всем элементам parsed data, имя активного в 

        #данный момент элемента будет site

            link, title = site[0], site[1]

            #Присваивание переменным link и title значений из кортежа site

            title = title.replace("<em>", "").replace("</em>", "").replace("<b>", "").replace("</b>", "")

            #.replace -это замена HTML-тэгов, которые проскакивают в title и совершено не нужны

            out.write(u"{link}|{title}\n".format(link=link, title=title).encode("cp1251"))

            #Производится сама запись в файл, используется оператор форматирования 

            #строк .format, в отличие от % он поддерживает именованные аргументы, чем я и не 

            #преминул воспользоваться, таким образом в файл пишется строка вида:

            #ссылка на сайт | title страницы\n -символ переноса строки(все это переводится

            #из юникода в cp1251)

    LOCK.release()

    #"Отпирание"  замка, в противном случае ни один из следующих 

    #потоков не сможет работать с этим участком кода. По-хорошему, тут тоже нужно 

    #сделать обработку ошибок, но это учебный пример, да и ошибка там может 

    #возникнуть (после добавки замка в этот участок кода) только если во время

    #работы приложения выставить атрибут “только чтение” для данного пользователя

    #относительно файла parsed_data.txt


Далее идет реализация функции get_and_parse_page:

def get_and_parse_page(target_link):
#Обьявление функции, аргумент – ссылка на страницу

    global PROXY

    #Указывает на то, что в данной функции используется переменная PROXY

    #из глобального пространства имен

    global HEADERS

    #То же и для переменной Headers

    if PROXY is not None:

    #Если значение PROXY не равно None

        proxy_handler = urllib2.ProxyHandler( { "http": ""+PROXY+"/" } )

        #Создается Прокси-Хэндлер с указанным прокси

        opener = urllib2.build_opener(proxy_handler)

        #Далее создается opener c созданным ранее Прокси-Хэндлером

        urllib2.install_opener(opener)

        #И наконец-то он устанавливается, теперь нет необходимости в 

        #шаманствах, все запросы в которых будет использоваться urllib2 

        #(в пределах этой функции будут направляться через указанный ранее 

        #PROXY)

    page_request = urllib2.Request(url=target_link, headers=HEADERS)

    #Создается обьект Request, который олицетворяет собой Request instance,

    #фактически это GET запрос к серверу с указанными параметрами, мне 

    #же необходимо использовать заголовки...

    try:

    #Обработка всех возможных ошибок, возникающих во время получения

    #страницы, это нехорошо, но лучше чем полное отсутствие обработки

        page = urllib2.urlopen(url=page_request).read().decode("UTF-8", "replace")

        #Переменной page присваиваем прочитанное значение страницы запроса, переведенное 

        #в unicode из кодировки UTF-8 (кодировка, используемая на www.google.com) (в 

        #Python 2.6 unicode -это отдельный тип данных(!))

    except Exception ,error:

    #Сам перехват ошибки и сохранение ее значения в переменную error

        print str(error)

        #Вывод ошибки в консоль, прведварительно переведя ее в строку 

        #(просто на всякий случай)

        return "ERROR"

        #Возврат из функции в том случае, если во время работы возникла ошибка

    harvested_data = re.findall(r'''\<li\ class\=g\>\<h4\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h4\>''', page)

    #Сбор со страницы поиска ссылок и title найденных страниц

    #Очистка данных от результатов поиска по блогам, картинкам и др. сервисам гугла

    for data in harvested_data:

    #Для каждого элемента массива harvested_data присвоить ему имя data

        if data[0].startswith("/"):

        #Если нулевой элемент массива data(ссылка) начинается с символа /

            harvested_data.remove(data)

            #Удаляем его из массива harvested_data

        if ".google.com" in data[0]:

        #Если нулевой элемент массива data(ссылка) имеет в себе .google.com

            harvested_data.remove(data)

            #Также удаляем его из массива harvested_data

    return harvested_data

    #Возвращаем собранные значения из функции


Наконец-то дошла очередь до реализации основного тела приложения:

def main():
#Обявление функции, входных аргментов нет

    print "STARTED"

    #Вывод в консоль о начале процесса

    global THREADS_COUNT

    global DEEP

    global ENCODING

    #Обьявляние о том что эти переменные будут использоваться

    #из глобального пространства имен

    with open("requests.txt") as requests:

    #Открываем файл requests в котором находятся запросы к поисковику

         for request in requests:

         #На данном файлхэндлере доступен итератор, поэтому можно 

         #пройтись по файлу циклом, без загрузки файл в оперативку, но это 

         #тоже не важно, я все равно его туда загружу:)

                request = request.translate(None, "\r\n").decode(ENCODING, "replace")

                #Очистка запроса от символов конца строки а также их 

                #перевод в юникод (с заменой конфликтных символов)

                empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"

                #Это пустой адрес страницы поиска, отформатирован

                for i in xrange(0, DEEP, 10):

                #Проход итератором по диапазону #чисел от 0 до DEEP, 

                #который представляет собой максимальную глубину поиска с 

                #шагом в 10, т.е. получаем из этого диапазона только 

                #числа десятков, т.е. 10, 20, 30 (как идет поиск у гугла)

                     queue.put(empty_link.format(request=request.encode("UTF-8"), N=i))

                     #Добавление в очередь каждой сгенерированной ссылки

                     #и перевод её в кодировку UTF-8 (для гугла)

    for _ in xrange(THREADS_COUNT):

    #Проход циклом по диапазону чисел количества потоков

        thread_ = threading.Thread(target=worker)

        #Создается поток, target-имя функции, которая являет собой 

        #участок кода, выполняемый многопоточно

        thread_.start()

        #Вызывается метод start() , таким образом поток запускается

    while threading.active_count() >1:

    #До тех пор, пока количество активных потоков больше 1 (значит, 

    #запущенные потоки продолжают работу)

        time.sleep(1)

        #Основной поток засыпает на 1 секунду

    print "FINISHED"

    #Вывод в консоль о завершении работы приложения


В итоге получаем нормально работающий многопоточный парсер. Естественно с многими минусами, но красиво написанное я задолбусь комментировать.

Код:

Эта статья + исходники: sendspace.com/file/mw0pac
Код с русскими коментариями: dumpz.org/15202
Код с украинскими коментариями: dumpz.org/15201

P.S. Да я знаю, что кому-то этот пример покажется нерациональным использованием Queue (привет, cr0w). Но вот обработку ошибок проще всего делать именно, используя его.

P.P.S. Материал не претендует на непогрешимость. Естественно, тут 100% быдлокод, никакого понимания мной того что я описываю, непонятки с терминами, я-быдлокодер и т.д. и т.п. НО тут есть то, чего вам не пересрать — оно РАБОТАЕТ, причем работает именно так, как от него ожидается, код понятен и откомментирован так, что будет понятно даже младенцу.Надеюсь, что оно хоть кому-то поможет…

© login999
uasc.org.ua

Python 3 — многопоточное программирование

Запуск нескольких потоков аналогичен запуску нескольких различных программ одновременно, но со следующими преимуществами:

  • Несколько потоков внутри процесса совместно используют одно и то же пространство данных с основным потоком и поэтому могут обмениваться информацией или общаться друг с другом легче, чем если бы они были отдельными процессами.

  • Потоки иногда называют легкими процессами, и они не требуют больших затрат памяти; они дешевле, чем процессы.

Несколько потоков внутри процесса совместно используют одно и то же пространство данных с основным потоком и поэтому могут обмениваться информацией или общаться друг с другом легче, чем если бы они были отдельными процессами.

Потоки иногда называют легкими процессами, и они не требуют больших затрат памяти; они дешевле, чем процессы.

Поток имеет начало, последовательность выполнения и заключение. У него есть указатель инструкции, который отслеживает, где в его контексте он работает.

  • Это может быть прервано (прервано).

  • Он может быть временно приостановлен (также известный как спящий режим) во время работы других потоков — это называется уступкой.

Это может быть прервано (прервано).

Он может быть временно приостановлен (также известный как спящий режим) во время работы других потоков — это называется уступкой.

Есть два разных вида темы —

  • поток ядра
  • пользовательская нить

Потоки ядра являются частью операционной системы, тогда как потоки пространства пользователя не реализованы в ядре.

Есть два модуля, которые поддерживают использование потоков в Python3 —

  • _нить
  • нарезания резьбы

Потоковый модуль давно устарел. Пользователям рекомендуется использовать модуль потоков. Следовательно, в Python 3 модуль «поток» больше не доступен. Тем не менее, он был переименован в «_thread» для обратной совместимости в Python3.

Начиная новую тему

Чтобы создать другой поток, вам нужно вызвать следующий метод, доступный в модуле потока:

_thread.start_new_thread ( function, args[, kwargs] )

Этот вызов метода позволяет быстро и эффективно создавать новые потоки как в Linux, так и в Windows.

Вызов метода немедленно возвращается, и дочерний поток запускается и вызывает функцию с переданным списком аргументов . Когда функция возвращается, поток завершается.

Здесь args — это кортеж аргументов; используйте пустой кортеж для вызова функции без передачи аргументов. kwargs — это необязательный словарь аргументов ключевых слов.

пример

#!/usr/bin/python3

import _thread
import time

# Define a function for the thread
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

# Create two threads as follows
try:
   _thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   _thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print ("Error: unable to start thread")

while 1:
   pass

Выход

Когда приведенный выше код выполняется, он дает следующий результат —

Thread-1: Fri Feb 19 09:41:39 2016
Thread-2: Fri Feb 19 09:41:41 2016
Thread-1: Fri Feb 19 09:41:41 2016
Thread-1: Fri Feb 19 09:41:43 2016
Thread-2: Fri Feb 19 09:41:45 2016
Thread-1: Fri Feb 19 09:41:45 2016
Thread-1: Fri Feb 19 09:41:47 2016
Thread-2: Fri Feb 19 09:41:49 2016
Thread-2: Fri Feb 19 09:41:53 2016

Программа идет в бесконечном цикле. Вам придется нажать Ctrl-C, чтобы остановить

Хотя это очень эффективно для низкоуровневых потоков, модуль резьбы очень ограничен по сравнению с более новым модулем потоков.

Поточный модуль

Более новый модуль потоков, включенный в Python 2.4, обеспечивает гораздо более мощную высокоуровневую поддержку потоков, чем модуль потоков, рассмотренный в предыдущем разделе.

Модуль потоков предоставляет все методы модуля потоков и предоставляет некоторые дополнительные методы:

  • threading.activeCount () — возвращает количество активных объектов потока.

  • threading.currentThread () — возвращает количество объектов потока в элементе управления потока вызывающей стороны.

  • threading.enumerate () — возвращает список всех объектов потока, которые в данный момент активны.

threading.activeCount () — возвращает количество активных объектов потока.

threading.currentThread () — возвращает количество объектов потока в элементе управления потока вызывающей стороны.

threading.enumerate () — возвращает список всех объектов потока, которые в данный момент активны.

В дополнение к методам, у модуля потоков есть класс Thread, который реализует потоки. Методы, предоставляемые классом Thread , следующие:

  • run () — Метод run () является точкой входа для потока.

  • start () — метод start () запускает поток, вызывая метод run.

  • join ([время]) — join () ожидает завершения потоков.

  • isAlive () — метод isAlive () проверяет, выполняется ли еще поток.

  • getName () — Метод getName () возвращает имя потока.

  • setName () — Метод setName () устанавливает имя потока.

run () — Метод run () является точкой входа для потока.

start () — метод start () запускает поток, вызывая метод run.

join ([время]) — join () ожидает завершения потоков.

isAlive () — метод isAlive () проверяет, выполняется ли еще поток.

getName () — Метод getName () возвращает имя потока.

setName () — Метод setName () устанавливает имя потока.

Создание темы с использованием модуля Threading

Чтобы реализовать новый поток с помощью модуля потоков, вы должны сделать следующее:

  • Определите новый подкласс класса Thread .

  • Переопределите метод __init __ (self [, args]), чтобы добавить дополнительные аргументы.

  • Затем переопределите метод run (self [, args]), чтобы реализовать то, что поток должен делать при запуске.

Определите новый подкласс класса Thread .

Переопределите метод __init __ (self [, args]), чтобы добавить дополнительные аргументы.

Затем переопределите метод run (self [, args]), чтобы реализовать то, что поток должен делать при запуске.

Создав новый подкласс Thread , вы можете создать его экземпляр, а затем запустить новый поток, вызвав метод start () , который, в свою очередь, вызывает метод run () .

пример

#!/usr/bin/python3

import threading
import time

exitFlag = 0

class myThread (threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print ("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print ("Exiting " + self.name)

def print_time(threadName, delay, counter):
   while counter:
      if exitFlag:
         threadName.exit()
      time.sleep(delay)
      print ("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("Exiting Main Thread")

Результат

Когда мы запускаем вышеуказанную программу, она дает следующий результат —

Starting Thread-1
Starting Thread-2
Thread-1: Fri Feb 19 10:00:21 2016
Thread-2: Fri Feb 19 10:00:22 2016
Thread-1: Fri Feb 19 10:00:22 2016
Thread-1: Fri Feb 19 10:00:23 2016
Thread-2: Fri Feb 19 10:00:24 2016
Thread-1: Fri Feb 19 10:00:24 2016
Thread-1: Fri Feb 19 10:00:25 2016
Exiting Thread-1
Thread-2: Fri Feb 19 10:00:26 2016
Thread-2: Fri Feb 19 10:00:28 2016
Thread-2: Fri Feb 19 10:00:30 2016
Exiting Thread-2
Exiting Main Thread

Синхронизация потоков

Модуль потоков, поставляемый с Python, включает в себя простой в реализации механизм блокировки, который позволяет синхронизировать потоки. Новая блокировка создается путем вызова метода Lock () , который возвращает новую блокировку.

Метод получения (блокировки) нового объекта блокировки используется, чтобы заставить потоки работать синхронно. Необязательный параметр блокировки позволяет вам контролировать, ожидает ли поток получения блокировки.

Если для блокировки установлено значение 0, поток немедленно возвращает значение 0, если блокировка не может быть получена, и 1, если блокировка получена. Если для блокировки установлено значение 1, поток блокируется и ожидает снятия блокировки.

Метод release () нового объекта блокировки используется для снятия блокировки, когда она больше не требуется.

пример

#!/usr/bin/python3

import threading
import time

class myThread (threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print ("Starting " + self.name)
      # Get lock to synchronize threads
      threadLock.acquire()
      print_time(self.name, self.counter, 3)
      # Free lock to release next thread
      threadLock.release()

def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print ("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1

threadLock = threading.Lock()
threads = []

# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# Start new Threads
thread1.start()
thread2.start()

# Add threads to thread list
threads.append(thread1)
threads.append(thread2)

# Wait for all threads to complete
for t in threads:
   t.join()
print ("Exiting Main Thread")

Выход

Когда приведенный выше код выполняется, он дает следующий результат —

Starting Thread-1
Starting Thread-2
Thread-1: Fri Feb 19 10:04:14 2016
Thread-1: Fri Feb 19 10:04:15 2016
Thread-1: Fri Feb 19 10:04:16 2016
Thread-2: Fri Feb 19 10:04:18 2016
Thread-2: Fri Feb 19 10:04:20 2016
Thread-2: Fri Feb 19 10:04:22 2016
Exiting Main Thread

Многопоточная приоритетная очередь

Модуль Queue позволяет вам создать новый объект очереди, который может содержать определенное количество элементов. Существуют следующие способы управления очередью:

  • get () — get () удаляет и возвращает элемент из очереди.

  • put () — Put добавляет элемент в очередь.

  • qsize () — qsize () возвращает количество элементов, которые в данный момент находятся в очереди.

  • empty () — empty () возвращает True, если очередь пуста; в противном случае Ложь.

  • full () — full () возвращает True, если очередь заполнена; в противном случае Ложь.

get () — get () удаляет и возвращает элемент из очереди.

put () — Put добавляет элемент в очередь.

qsize () — qsize () возвращает количество элементов, которые в данный момент находятся в очереди.

empty () — empty () возвращает True, если очередь пуста; в противном случае Ложь.

full () — full () возвращает True, если очередь заполнена; в противном случае Ложь.

пример

#!/usr/bin/python3

import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q
   def run(self):
      print ("Starting " + self.name)
      process_data(self.name, self.q)
      print ("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print ("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
         time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
   pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
   t.join()
print ("Exiting Main Thread")

Выход

Когда приведенный выше код выполняется, он дает следующий результат —

Многопоточность в Python. Разбираемся подробней.

Возможно вы все пользовались потоками в Питоне, типа как я. Запустили там, пару тысяч потоков и что там вроде делается , висит , тупит . Но я решил разобраться с этой темой поподробней.

Что такое Thread (Поток)?

Thread – это отдельный поток выполнения. Это означает, что в вашей программе могут работать две и более подпрограммы одновременно. Ваши программы работает только на одном процессоре. Различные задачи, внутри потоков выполняются на одном ядре, а операционная система управляет, когда ваша программа работает с каким потоком. Как то туманно , да ? Мне очень понравилась аналогия которую прочитал в статье , я ее процитирую :

Лучшая аналогия, которую я читал «Введение Async IO в Python: полное прохождение» : которое сравнивает этот процесс с шахматистом-гроссмейстером, соревнующимся одновременно со многими противниками. Это всего лишь один человек, ему нужно переключаться между задачами (играми) и помнить состояние (обычно это называется state) для каждой игры.

webdevblog.ru

В Python есть стандартная библиотека для работы с потоками threading и класс Thread. Давайте что нибудь простое изобразим :

import threading
import time

def potoc (name):
    print('Поток '+str(name) +' стартовал.')
    time.sleep(2) # Спим
    print('Поток ' + str(name) +' остановился.')
print ('Создаем поток')
x = threading.Thread(target=potoc, args=(1,)) # Создание потока
print('Запускаем поток.')
x.start() # Запуск потока
print('Ждем когда завершиться поток.')
print('Конец программы.')
  • target=potoc — Передаем имя функции которая будет выполняться(подпрограмма).
  • args=(1,) — Передаем список аргументов функции.

Если вы заметили программа продолжила выполняться дальше. Это как раз и есть наглядная демонстрация что такое поток. Функция выполняется не зависимо от самой программы(в своем потоке) или на оборот. Но на самом деле программа не завершилась после выполнения print('Конец программы.'). Она ожидает выхода, потому что сам поток находится в спящем режиме. Как только он завершит работу и напечатает сообщение, вызовется метод .join() и программа сможет выйти. Тоже самое мы может сделать и сами, вызвать метод .join() для ожидания завершения запущенных потоков в любом месте программы.

print('Ждем когда завершиться поток.')
x.join()
print('Конец программы.')

Но есть и другой путь. Запустить демонический поток , который будет остановлен сразу после завершения программы. Еще проще, запустили, забыли и бросили на произвол судьбы. Что в нем там происходит , завершился он и т.д

демон (deamon)  — это компьютерная программа , которая работает в фоновом режиме, а не находится под непосредственным управлением интерактивного пользователя.

x = threading.Thread(target=potoc, args=(1,), daemon = True) # Создание демонического потока
Не забудьте закомментировать x.join() 

Давайте теперь запустим побольше потоков, штучек пять:

import threading
import time

def potoc (name):
    print('Поток '+str(name) +' стартовал.')
    time.sleep(2) # Спим
    print('Поток ' + str(name) +' остановился.')
for i in range(5):
    print ('Создаем поток '+str(i))
    x = threading.Thread(target=potoc, args=(i,))
    print('Запускаем поток '+str(i))
    x.start() # Запуск потока
print('Ждем когда завершиться поток.')
#x.join()
print('Конец программы.')

Запустив несколько раз код вы поймете, что потоки не смотря на то что создаются и запускаются друг за другом, завершаются всегда по разному. Дело в том что порядок выполнения потоков определяется операционной системой и его может быть довольно сложно предсказать или не возможно.

Для эксперимента можете попробовать запустить скажем миллион потоков ))) В этом тоже есть опасность, система имеет конечные ресурсы. Конечно же в Питоне есть методы для управления и контроля потоков.

Использование ThreadPoolExecutor.

Из названия думаю все понятно или не очень ))) Экзекутор Пула Потоков, Ахха-ха-ха-ха ))) Ну а если серьезно, этот метод нам поможет контролировать пул потоков. Давайте сразу пример и там разберемся.

import time
import concurrent.futures

def potoc (name):
    print('Поток '+str(name) +' стартовал.')
    time.sleep(2) # Спим
    print('Поток ' + str(name) +' остановился.')

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(potoc, range(1,6))
#print('Ждем когда завершиться поток.')
#x.join()
print('Конец программы.')

ThreadPoolExecutor входит в стандартную библиотеку Питона: concurrent.futures — Запуск параллельных задач

  • concurrent.futures.ThreadPoolExecutor(max_workers=3) — Запускаем группу(пул) потолок, max_workers=3 — максимальное число работающих одновременно.
  • executor.map(potoc, range(1,6)) — Методом .map мы создаем поток и добавляем его в пул. Potoc — функция , и передаем переменную в диапазоне от 1 до 5. То есть, будет выполнено пять раз.

Собственно все это мы обернули в менеджер контекст with…as. Что из этого вышло : запустилось три потока, и как только какой то поток завершился, наш экзекутор запускает следующий из пула. Идея всего этого типа такая.

Примечание. Использование ThreadPoolExecutor может привести к ошибкам. Например, если вы вызываете функцию, которая не принимает параметров, но передаете ей параметры в .map(), в этом случае поток выдаст исключение. К сожалению, ThreadPoolExecutor скрывает это исключение, и (в случае выше) программа завершается без вывода. Это может стать довольно запутанным при отладке.

Вы библиотеке также есть класс ProcessPoolExecutor для процессов.

with concurrent.futures.ProcessPoolExecutor() as executor:
result = executor.map(function, iterable)

Условия гонки (Race Conditions).

Условия гонки могут возникать, когда два или более потока обращаются к общему фрагменту данных или ресурсу. Например выполняя простую операцию с общими данными в потоке х = х + 1 , операционная система передаст управление другому потоку сразу после прочтения значения х . Вторая часть : +1 и запись нового значения в переменную х будет исполнено позже. Думаю понятно что это может принести очень много проблем при отладке программы. Есть конечно методы борьбы с этим, например threading.Lock() (блокировка потока).

Блокировку может использовать только один поток одновременно. Если другой поток захочет вызвать блокировку в это же время, ему придётся ждать пока не разблокируется другой поток. Будьте внимательны , если вы не освободите поток, ваша программа просто зависнет ! Для блокировки потока используется метод .acquire() и освобождение потока .release(). Давайте попробуем что нибудь симулировать.

import threading
import time
import concurrent.futures

def potoc (name):
    #lock.acquire()
    global x
    local = x * 2
    x = local
    #lock.release()
    time.sleep(5)
for z in range(10):
    x = 1
    for i in range(1,11):
        l = threading.Thread(target=potoc, args=(i,))
        l.start()
    l.join()
    print('Величина переменно', x)

Теперь добавим блокировку потока во время обращения к переменной.

import threading
import time
import concurrent.futures

def potoc (name):
    global x
    print('Поток '+str(name) +' стартовал.')
    lock.acquire()
    local = x
    print (str(local)+'\n')
    x = local * 2
    lock.release()
    time.sleep(5) # Спим

for z in range(10):
    print ('Новый цикл')
    x = 1
    lock = threading.Lock()
    for i in range(1,11):
        l = threading.Thread(target=potoc, args=(i,))
        l.start()
    l.join()
    print('Величина переменно', x)

Сколько потоком запускать ?

Это наверное первый вопрос который возник у вас в голове. Я не дам однозначного ответа, но есть такое мнение : сколько ядер\виртуальных потоков у процессора, столько потоков и запускать одновременно. Все зависит от компьютера и задач которые вы пытаетесь выполнить.

Если вы нашли ошибку, пожалуйста, выделите фрагмент текста и нажмите Ctrl+Enter.

Threading — управление параллельными потоками

Цель: Создание модуля thread для простого управления несколькими потоками выполнения.
Доступно в версии: 1.5.2 и выше.

Модуль threading построен на низкоуровневых функциях thread, что делает работу с потоками проще. Использование потоков позволяет программе запускать одновременно несколько операций в одном пространстве процесса.

Самый простой способ использовать поток — создать его с помощью целевой функции и запустить с помощью метода start().

import threading

def worker():
    """thread worker function"""
    print 'Worker'
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()


Результат работы программы – пять строк со строкой «Worker»:

$ python threading_simple.py

Worker
Worker
Worker
Worker
Worker

В приведенном ниже примере в качестве аргумента потоку передается число для вывода.

import threading

def worker(num):
    """thread worker function"""
    print 'Worker: %s' % num
    return

threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

Целочисленный аргумент теперь включен в сообщение, выводимое каждым потоком:

$ python -u threading_simpleargs.py

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

Использование аргументов для идентификации потока является трудоемким процессом. Каждый экземпляр Thread имеет имя со значением, присваиваемым по умолчанию. Оно может быть изменено, когда создается поток.

Именование потоков полезно в серверных процессах с несколькими служебными потоками, обрабатывающими различные операции.

import threading
import time

def worker():
    print threading.currentThread().getName(), 'Starting'
    time.sleep(2)
    print threading.currentThread().getName(), 'Exiting'

def my_service():
    print threading.currentThread().getName(), 'Starting'
    time.sleep(3)
    print threading.currentThread().getName(), 'Exiting'

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # используем имя по умолчанию

w.start()
w2.start()
t.start()

Программа выводит имя текущего потока в каждой строке. «Thread-1» — это безымянный поток w2.

$ python -u threading_names.py

worker Thread-1 Starting
my_service Starting
Starting
Thread-1worker Exiting
 Exiting
my_service Exiting

Большинство программ не используют print для отладки. Модуль logging поддерживает добавление имени потока в каждое сообщение журнала с помощью % (threadName)s. Включение имен потоков в журнал облегчает отслеживание этих сообщений.

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
                    )

def worker():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

def my_service():
    logging.debug('Starting')
    time.sleep(3)
    logging.debug('Exiting')

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name

w.start()
w2.start()
t.start()

Модуль logging также является поточно-ориентированным, поэтому сообщения из разных потоков сохранятся в выводимых данных.

$ python threading_names_log.py

[DEBUG] (worker    ) Starting
[DEBUG] (Thread-1  ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker    ) Exiting
[DEBUG] (Thread-1  ) Exiting
[DEBUG] (my_service) Exiting

До этого момента примеры программ ожидали, пока все потоки не завершат свою работу. Иногда программы порождают такой поток, как демон. Он работает, не блокируя завершение основной программы.

Использование демона полезно, если не удается прервать поток или завершить его в середине работы, не потеряв и не повредив при этом данные.

Чтобы пометить поток как demon, вызовите метод setDaemon() с логическим аргументом. По умолчанию потоки не являются «демонами», поэтому передача в качестве аргумента значения True включает режим demon.

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

Обратите внимание, что в выводимых данных отсутствует сообщение «Exiting» от потока-демона. Все потоки, не являющиеся «демонами» (включая основной поток), завершают работу до того, как поток-демон выйдет из двухсекундного сна.

$ python threading_daemon.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

Чтобы дождаться завершения работы потока-демона, используйте метод join().

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

Метод join() позволяет demon вывести сообщение «Exiting».

$ python threading_daemon_join.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

Также можно передать аргумент задержки (количество секунд, в течение которых поток будет неактивным).

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join(1)
print 'd.isAlive()', d.isAlive()
t.join()

Истекшее время ожидания меньше, чем время, в течение которого поток-демон спит. Поэтому поток все еще «жив» после того, как метод join() продолжит свою работу.

$ python threading_daemon_join_timeout.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
d.isAlive() True

Можно не сохранять дескрипторы всех потоков-демонов, чтобы убедиться в их завершении до выхода из основного процесса. enumerate() возвращает список активных экземпляров Thread. Список включает в себя текущий поток. Но присоединение к текущему потоку не разрешено (это приводит к ситуации взаимной блокировки), его необходимо пропустить.

import random
import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def worker():
    """thread worker function"""
    t = threading.currentThread()
    pause = random.randint(1,5)
    logging.debug('sleeping %s', pause)
    time.sleep(pause)
    logging.debug('ending')
    return

for i in range(3):
    t = threading.Thread(target=worker)
    t.setDaemon(True)
    t.start()

main_thread = threading.currentThread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()

Поскольку worker спит в течение случайного отрезка времени, выходные данные программы могут отличаться. Это должно выглядеть примерно так:

$ python threading_enumerate.py

(Thread-1  ) sleeping 3
(Thread-2  ) sleeping 2
(Thread-3  ) sleeping 5
(MainThread) joining Thread-1
(Thread-2  ) ending
(Thread-1  ) ending
(MainThread) joining Thread-3
(Thread-3  ) ending
(MainThread) joining Thread-2

При запуске Thread выполняет базовую инициализацию и затем вызывает свой метод run(). Он в свою очередь вызывает целевую функцию, переданную конструктору. Чтобы создать подкласс Thread, переопределите run().

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')
        return

for i in range(5):
    t = MyThread()
    t.start()

Возвращаемое значение метода run() игнорируется.

$ python threading_subclass.py

(Thread-1  ) running
(Thread-2  ) running
(Thread-3  ) running
(Thread-4  ) running
(Thread-5  ) running

Значения args и kwargs, передаваемые в конструктор Thread, сохраняются в private переменных. Поэтому к ним трудно получить доступ из подкласса.

Для передачи аргументов пользовательскому потоку, переопределите конструктор, чтобы сохранить значения в атрибуте экземпляра.

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

class MyThreadWithArgs(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        return

    def run(self):
        logging.debug('running with %s and %s', self.args, self.kwargs)
        return

for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a':'A', 'b':'B'})
    t.start()

MyThreadWithArgs использует тот же API, что и Thread. Но другой класс может легко изменить метод конструктора, чтобы принимать другие аргументы, связанные с назначением потока.

$ python threading_subclass_args.py

(Thread-1  ) running with (0,) and {'a': 'A', 'b': 'B'}
(Thread-2  ) running with (1,) and {'a': 'A', 'b': 'B'}
(Thread-3  ) running with (2,) and {'a': 'A', 'b': 'B'}
(Thread-4  ) running with (3,) and {'a': 'A', 'b': 'B'}
(Thread-5  ) running with (4,) and {'a': 'A', 'b': 'B'}

Timer начинает свою работу после задержки и может быть отменен в любой момент этой задержки.

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def delayed():
    logging.debug('worker running')
    return

t1 = threading.Timer(3, delayed)
t1.setName('t1')
t2 = threading.Timer(3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

Второй таймер никогда не запускается, а первый запускается после завершения работы основной программы. Поскольку это не поток-демона, он присоединяется неявно, когда основной поток завершен.

$ python threading_timer.py

(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1        ) worker running

Бывают случаи, когда нужно синхронизировать операции в двух или более потоках. Простой способ реализации – использование объектов Event.

Event управляет внутренним флагом, который вызывающий объект может либо устанавливать (set()) либо сбрасывать (clear()). Другие потоки могут ждать (wait()), пока флаг не будет установлен (set()),блокируя процесс, пока не будет разрешено продолжить выполнение.

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')

Метод wait() принимает время задержки. Он возвращает логическое значение, указывающее, установлено событие или нет. Поэтому вызывающий объект знает, почему был возвращен wait(). Метод isSet() можно использовать для события отдельно, не опасаясь блокировки.

В этом примере wait_for_event_timeout() проверяет состояние события без бесконечной блокировки. wait_for_event() блокирует вызов wait(), который не возобновляет свою работу до изменения статуса события.

$ python threading_event.py

(block     ) wait_for_event starting
(non-block ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(non-block ) event set: False
(non-block ) doing other work
(non-block ) wait_for_event_timeout starting
(MainThread) Event is set
(block     ) event set: True
(non-block ) event set: True
(non-block ) processing event

Помимо синхронизации операций с потоками, также важно иметь возможность контролировать доступ к общим ресурсам, чтобы предотвратить повреждение данных.

Встроенные в Python структуры данных (списки, словари и т. д.) являются поточно-ориентированными. Другие структуры данных, реализованные в Python, и более простые типы (целые числа и числа с плавающей запятой) имеют такой защиты. Для защиты от одновременного доступа к объекту используйте объект Lock.

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
class Counter(object):
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start
    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
        finally:
            self.lock.release()

def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.currentThread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

В этом примере функция worker() увеличивает экземпляр Counter, который управляет Lock, чтобы два потока не могли одновременно изменить свое внутреннее состояние. Если Lock не использовался, можно пропустить изменение значения атрибута.

$ python threading_lock.py

(Thread-1  ) Sleeping 0.47
(Thread-2  ) Sleeping 0.65
(MainThread) Waiting for worker threads
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Sleeping 0.90
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Sleeping 0.11
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Done
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Done
(MainThread) Counter: 4

Чтобы выяснить, применил ли другой поток блокировку, не задерживая текущий поток, передайте значение False аргументу blocking функции acquire().

В следующем примере worker() пытается применить блокировку три раза и подсчитывает, сколько попыток нужно сделать. А lock_holder() выполняет циклическое переключение между снятием и запуском блокировки с короткими паузами в каждом состоянии, используемом для имитации загрузки.

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
def lock_holder(lock):
    logging.debug('Starting')
    while True:
        lock.acquire()
        try:
            logging.debug('Holding')
            time.sleep(0.5)
        finally:
            logging.debug('Not holding')
            lock.release()
        time.sleep(0.5)
    return
                    
def worker(lock):
    logging.debug('Starting')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Trying to acquire')
        have_it = lock.acquire(0)
        try:
            num_tries += 1
            if have_it:
                logging.debug('Iteration %d: Acquired',  num_tries)
                num_acquires += 1
            else:
                logging.debug('Iteration %d: Not acquired', num_tries)
        finally:
            if have_it:
                lock.release()
    logging.debug('Done after %d iterations', num_tries)


lock = threading.Lock()

holder = threading.Thread(target=lock_holder, args=(lock,), name='LockHolder')
holder.setDaemon(True)
holder.start()

worker = threading.Thread(target=worker, args=(lock,), name='Worker')
worker.start()

worker() требуется более трех итераций, чтобы применить блокировку три раза.

$ python threading_lock_noblock.py

(LockHolder) Starting
(LockHolder) Holding
(Worker    ) Starting
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 1: Acquired
(Worker    ) Trying to acquire
(LockHolder) Holding
(Worker    ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 3: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 5: Acquired
(Worker    ) Done after 5 iterations

Обычные объекты Lock не могут быть получены более одного раза даже одним и тем же потоком. Это может привести к нежелательным эффектам, если доступ к блокировке осуществляется несколькими функциями в одной цепочке вызовов.

import threading

lock = threading.Lock()

print 'First try :', lock.acquire()
print 'Second try:', lock.acquire(0)

Обе функции используют одну и ту же глобальную блокировку, а одна вызывает другую. Поэтому второй вызов завершится неудачно и будет заблокирован с использованием аргументов по умолчанию для acquire().

$ python threading_lock_reacquire.py

First try : True
Second try: False

В ситуации, когда отдельный код из одного и того же потока должен «повторно применить» блокировку, используйте RLock.

import threading

lock = threading.RLock()

print 'First try :', lock.acquire()
print 'Second try:', lock.acquire(0)

Единственным изменением в коде предыдущего примера является замена RLock на Lock .

$ python threading_rlock.py

First try : True
Second try: 1

Блокировки реализуют API context manager и совместимы с оператором with. Использование оператора with позволяет обойтись без блокировки.

import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')
        
def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()

lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))

w.start()
nw.start()

Функции worker_with() и worker_no_with() управляют блокировкой эквивалентными способами.

$ python threading_lock_with.py

(Thread-1  ) Lock acquired via with
(Thread-2  ) Lock acquired directly

Другой способ синхронизации потоков – объект Condition. Поскольку Condition использует Lock, его можно привязать к общему ресурсу. Это позволяет потокам ожидать обновления ресурса.

В приведенном ниже примере поток consumer() будет ждать, пока не будет установлено Condition, прежде чем продолжить. Поток producer() отвечает за установку Condition и уведомление других потоков о том, что они могут продолжить выполнение.

import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-2s) %(message)s',
                    )

def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    t = threading.currentThread()
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')

def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

Потоки используют with для блокировки, связанной с Condition. Использование методов acquire() и release()в явном виде также работает.

$ python threading_condition.py

2013-02-21 06:37:49,549 (c1) Starting consumer thread
2013-02-21 06:37:51,550 (c2) Starting consumer thread
2013-02-21 06:37:53,551 (p ) Starting producer thread
2013-02-21 06:37:53,552 (p ) Making resource available
2013-02-21 06:37:53,552 (c2) Resource is available to consumer
2013-02-21 06:37:53,553 (c1) Resource is available to consumer

Как разрешить доступ к ресурсу нескольким worker одновременно, но при этом ограничить их количество. Например, пул соединений может поддерживать фиксированное число одновременных подключений, или сетевое приложение может поддерживать фиксированное количество одновременных загрузок. Semaphore является одним из способов управления соединениями.

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-2s) %(message)s',
                    )

class ActivePool(object):
    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)

def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.currentThread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(target=worker, name=str(i), args=(s, pool))
    t.start()

В этом примере класс ActivePool является удобным способом отслеживания того, какие потоки могут запускаться в данный момент. Реальный пул ресурсов будет выделять соединение для нового потока и восстанавливать значение, когда поток завершен. В данном случае он используется для хранения имен активных потоков, чтобы показать, что только пять из них работают одновременно.

$ python threading_semaphore.py

2013-02-21 06:37:53,629 (0 ) Waiting to join the pool
2013-02-21 06:37:53,629 (1 ) Waiting to join the pool
2013-02-21 06:37:53,629 (0 ) Running: ['0']
2013-02-21 06:37:53,629 (2 ) Waiting to join the pool
2013-02-21 06:37:53,630 (3 ) Waiting to join the pool
2013-02-21 06:37:53,630 (1 ) Running: ['0', '1']
2013-02-21 06:37:53,730 (0 ) Running: ['1']
2013-02-21 06:37:53,731 (2 ) Running: ['1', '2']
2013-02-21 06:37:53,731 (1 ) Running: ['2']
2013-02-21 06:37:53,732 (3 ) Running: ['2', '3']
2013-02-21 06:37:53,831 (2 ) Running: ['3']
2013-02-21 06:37:53,833 (3 ) Running: []

Некоторые ресурсы должны быть заблокированы, чтобы их могли использовать сразу несколько потоков. А другие должны быть защищены от просмотра в потоках, которые не «владеют» ими. Функция local() создает объект, способный скрывать значения для отдельных потоков.

import random
import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

Обратите внимание, что значение local_data.value не доступно ни для одного потока, пока не будет установлено.

$ python threading_local.py

(MainThread) No value yet
(MainThread) value=1000
(Thread-1  ) No value yet
(Thread-1  ) value=34
(Thread-2  ) No value yet
(Thread-2  ) value=7

Чтобы все потоки начинались с одного и того же значения, используйте подкласс и установите атрибуты с помощью метода __init __() .

import random
import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

class MyLocal(threading.local):
    def __init__(self, value):
        logging.debug('Initializing %r', self)
        self.value = value

local_data = MyLocal(1000)
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

__init __() вызывается для каждого объекта (обратите внимание на значение id()) один раз в каждом потоке.

$ python threading_local_defaults.py

(MainThread) Initializing <__main__.MyLocal object at 0x100514390>
(MainThread) value=1000
(Thread-1  ) Initializing <__main__.MyLocal object at 0x100514390>
(Thread-1  ) value=1000
(Thread-2  ) Initializing <__main__.MyLocal object at 0x100514390>
(Thread-1  ) value=81
(Thread-2  ) value=1000
(Thread-2  ) value=54

Данная публикация представляет собой перевод статьи «threading – Manage concurrent threads» , подготовленной дружной командой проекта Интернет-технологии.ру

телеграм канал. Подпишись, будет полезно!

28) Многопоточность в Python — CoderLessons.com

Язык программирования Python позволяет вам использовать многопроцессорность или многопоточность. Из этого руководства вы узнаете, как писать многопоточные приложения на Python.

Что такое тема?

Поток — это единица исполнения при параллельном программировании. Многопоточность — это метод, который позволяет процессору одновременно выполнять множество задач одного процесса. Эти потоки могут выполняться отдельно при совместном использовании ресурсов процесса.

Что такое процесс?

Процесс — это в основном выполняемая программа. Когда вы запускаете приложение на своем компьютере (например, браузер или текстовый редактор), операционная система создает процесс.

Что такое многопоточность?

Многопоточность — это метод, который позволяет процессору выполнять несколько потоков одновременно. Эти потоки могут выполняться отдельно при совместном использовании ресурсов процесса.

Что такое многопроцессорная обработка?

Многопроцессорная обработка позволяет запускать несколько не связанных между собой процессов одновременно. Эти процессы не разделяют свои ресурсы и обмениваются данными через IPC.

Многопоточность Python против многопроцессорности

Чтобы понять процессы и потоки, рассмотрите следующий сценарий: .exe-файл на вашем компьютере — это программа. Когда вы открываете его, ОС загружает его в память, а процессор выполняет его. Экземпляр программы, которая сейчас запущена, называется процессом.

Каждый процесс будет иметь 2 основных компонента:

Теперь процесс может содержать одну или несколько частей, называемых потоками. Это зависит от архитектуры ОС. Вы можете думать о потоке как о разделе процесса, который может выполняться операционной системой отдельно.

Другими словами, это поток инструкций, которые ОС может выполнять независимо. Потоки в рамках одного процесса совместно используют данные этого процесса и предназначены для совместной работы для облегчения параллелизма.

В этом уроке вы узнаете,

Зачем использовать многопоточность?

Многопоточность позволяет разбить приложение на несколько подзадач и запускать эти задачи одновременно. Если вы используете многопоточность правильно, скорость вашего приложения, производительность и рендеринг могут быть улучшены.

Python MultiThreading

Python поддерживает конструкции как для многопроцессорной, так и для многопоточности. В этом уроке вы прежде всего сосредоточитесь на реализации многопоточных приложений с использованием Python. Существует два основных модуля, которые можно использовать для обработки потоков в Python:

  1. Модуль потока , и
  2. Резьб модуль

Тем не менее, в Python также есть нечто, называемое глобальной блокировкой интерпретатора (GIL). Это не позволяет значительно увеличить производительность и может даже снизить производительность некоторых многопоточных приложений. Вы узнаете все об этом в следующих разделах этого урока.

Модули Thread и Threading

Эти два модуля , которые вы узнаете в этом руководстве , является модулем резьбы и модуль нарезания резьбы .

Тем не менее, модуль потока давно устарел. Начиная с Python 3, он был обозначен как устаревший и доступен только как __thread для обратной совместимости.

Вы должны использовать модуль потоков верхнего уровня для приложений, которые вы собираетесь развернуть. Модуль потока был рассмотрен здесь только в образовательных целях.

Модуль потока

Синтаксис для создания нового потока с использованием этого модуля выглядит следующим образом:

thread.start_new_thread(function_name, arguments)

Хорошо, теперь вы рассмотрели базовую теорию, чтобы начать кодирование. Итак, откройте свой IDLE или блокнот и введите следующее:

import time
import _thread

def thread_test(name, wait):
   i = 0
   while i <= 3:
      time.sleep(wait)
      print("Running %s\n" %name)
      i = i + 1

   print("%s has finished execution" %name)

if __name__ == "__main__":
    
    _thread.start_new_thread(thread_test, ("First Thread", 1))
    _thread.start_new_thread(thread_test, ("Second Thread", 2))
    _thread.start_new_thread(thread_test, ("Third Thread", 3))

Сохраните файл и нажмите F5, чтобы запустить программу. Если все было сделано правильно, это вывод, который вы должны увидеть:

Вы узнаете больше об условиях гонки и о том, как справиться с ними в следующих разделах

ОБЪЯСНЕНИЕ КОДА

  1. Эти операторы импортируют модуль времени и потока, который используется для обработки и задержки потоков Python.
  2. Здесь вы определили функцию с именем thread_test, которая будет вызываться методом start_new_thread . Функция запускает цикл while для четырех итераций и печатает имя потока, который ее вызвал. По завершении итерации печатается сообщение о том, что поток завершил выполнение.
  3. Это основной раздел вашей программы. Здесь, вы просто вызовите start_new_thread метод с thread_test функции в качестве аргумента.

    Это создаст новый поток для функции, которую вы передадите в качестве аргумента, и начнете ее выполнять. Обратите внимание, что вы можете заменить это (thread _ test) любой другой функцией, которую вы хотите запустить как поток.

Поточный модуль

Этот модуль является высокоуровневой реализацией потоков в Python и стандартом де-факто для управления многопоточными приложениями. Он обеспечивает широкий спектр функций по сравнению с резьбовым модулем.

Структура модуля Threading

Вот список некоторых полезных функций, определенных в этом модуле:

Имя функции Описание
activeCount () Возвращает количество объектов Thread, которые еще живы
currentThread () Возвращает текущий объект класса Thread.
перечисление () Перечисляет все активные объекты Thread.
isDaemon () Возвращает true, если поток является демоном.
является живым() Возвращает true, если поток еще жив.
Методы класса Thread
Начните() Запускает активность потока. Он должен вызываться только один раз для каждого потока, потому что он вызовет ошибку времени выполнения, если вызывается несколько раз.
запустить() Этот метод обозначает активность потока и может быть переопределен классом, который расширяет класс Thread.
присоединиться() Он блокирует выполнение другого кода до тех пор, пока поток, в котором был вызван метод join (), не завершится.

Предыстория: класс Thread

Прежде чем приступить к написанию многопоточных программ с использованием модуля Threading, очень важно понять класс Thread. Класс потока — это основной класс, который определяет шаблон и операции потока в python.

Наиболее распространенный способ создания многопоточного приложения на Python — это объявление класса, который расширяет класс Thread и переопределяет его метод run ().

В общем, класс Thread обозначает кодовую последовательность, выполняемую в отдельном потоке управления.

Итак, при написании многопоточного приложения вы будете делать следующее:

  1. определить класс, который расширяет класс Thread
  2. Переопределить конструктор __init__
  3. Переопределение прогона () метод

Как только объект потока создан, метод start () может использоваться для начала выполнения этого действия, а метод join () может использоваться для блокировки всего другого кода до завершения текущего действия.

Теперь давайте попробуем использовать модуль потоков для реализации вашего предыдущего примера. Снова, запустите ваш IDLE и введите следующее:

import time
import threading

class threadtester (threading.Thread):
    def __init__(self, id, name, i):
       threading.Thread.__init__(self)
       self.id = id
       self.name = name
       self.i = i
       
    def run(self):
       thread_test(self.name, self.i, 5)
       print ("%s has finished execution " %self.name)

def thread_test(name, wait, i):

    while i:
       time.sleep(wait)
       print ("Running %s \n" %name)
       i = i - 1

if __name__=="__main__":
    thread1 = threadtester(1, "First Thread", 1)
    thread2 = threadtester(2, "Second Thread", 2)
    thread3 = threadtester(3, "Third Thread", 3)

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

Это будет вывод, когда вы выполните приведенный выше код:

ОБЪЯСНЕНИЕ КОДА

  1. Эта часть такая же, как в нашем предыдущем примере. Здесь вы импортируете модуль времени и потока, который используется для обработки выполнения и задержек потоков Python.
  2. В этом бите вы создаете класс с именем threadtester, который наследует или расширяет класс Thread модуля Threading. Это один из самых распространенных способов создания потоков в Python. Однако вам следует переопределить только конструктор и метод run () в вашем приложении. Как вы можете видеть в приведенном выше примере кода, метод __init__ (конструктор) был переопределен.

    Точно так же вы также переопределили метод run () . Он содержит код, который вы хотите выполнить внутри потока. В этом примере вы вызвали функцию thread_test ().

  3. Это метод thread_test (), который принимает значение i в качестве аргумента, уменьшает его на 1 на каждой итерации и перебирает оставшуюся часть кода до тех пор, пока i не станет равным 0. На каждой итерации он печатает имя текущего выполняющегося потока и спит в течение секунд ожидания (что также принимается в качестве аргумента).
  4. thread1 = threadtester (1, «Первая нить», 1)

    Здесь мы создаем поток и передаем три параметра, которые мы объявили в __init__. Первый параметр — это идентификатор потока, второй параметр — имя потока, а третий параметр — счетчик, который определяет, сколько раз должен выполняться цикл while.

  5. thread2.start ()

    Метод start используется для запуска выполнения потока. Внутри функция start () вызывает метод run () вашего класса.

  6. thread3.join ()

    Метод join () блокирует выполнение другого кода и ожидает завершения потока, в котором он был вызван.

Как вы уже знаете, потоки, находящиеся в одном процессе, имеют доступ к памяти и данным этого процесса. В результате, если более чем один поток пытается изменить или получить доступ к данным одновременно, могут появиться ошибки.

В следующем разделе вы увидите различные виды сложностей, которые могут проявиться, когда потоки получают доступ к данным и критическому разделу без проверки существующих транзакций доступа.

Тупики и условия гонки

Прежде чем узнавать о взаимоблокировках и условиях гонки, полезно понять несколько основных определений, связанных с параллельным программированием:

  • Критический раздел

    Это фрагмент кода, который обращается или изменяет общие переменные и должен выполняться как атомарная транзакция.

  • Переключение контекста

    Это процесс, за которым следует CPU, чтобы сохранить состояние потока перед тем, как перейти от одной задачи к другой, чтобы впоследствии можно было продолжить его с той же точки.

Тупики

Взаимные блокировки — наиболее опасная проблема, с которой сталкиваются разработчики при написании параллельных / многопоточных приложений на python. Лучший способ понять взаимоблокировки — это использовать пример классической проблемы информатики, известный как проблема обедающих философов.

Постановка проблемы для столовых философов заключается в следующем:

Пять философов сидят за круглым столом с пятью тарелками спагетти (разновидность макарон) и пятью вилками, как показано на схеме.

Столовая Философы Проблема

В любой момент времени философ должен либо есть, либо думать.

Кроме того, философ должен взять две вилки рядом с ним (то есть левую и правую вилки), прежде чем он сможет съесть спагетти. Проблема тупика возникает, когда все пять философов одновременно поднимают свои правильные вилки.

Поскольку у каждого из философов есть одна вилка, они все будут ждать, пока другие положат свою вилку. В результате никто из них не сможет съесть спагетти.

Точно так же в параллельной системе тупик возникает, когда разные потоки или процессы (философы) пытаются одновременно получить общие системные ресурсы (вилки). В результате ни один из процессов не получает возможности для выполнения, поскольку они ожидают другого ресурса, удерживаемого каким-либо другим процессом.

Условия гонки

Состояние гонки — это нежелательное состояние программы, которое возникает, когда система выполняет две или более операций одновременно. Например, рассмотрим этот простой цикл for:

i=0; # a global variable
for x in range(100):
    print(i)
    i+=1;

Если вы создаете n потоков, которые одновременно запускают этот код, вы не сможете определить значение i (которое совместно используется потоками), когда программа завершит выполнение. Это связано с тем, что в реальной среде многопоточности потоки могут перекрываться, и значение i, которое было извлечено и изменено потоком, может меняться между ними, когда какой-то другой поток обращается к нему.

Это два основных класса проблем, которые могут возникнуть в многопоточных или распределенных приложениях Python. В следующем разделе вы узнаете, как преодолеть эту проблему путем синхронизации потоков.

Синхронизация потоков

Чтобы справиться с условиями гонки, взаимоблокировками и другими проблемами, связанными с потоками, модуль потоков предоставляет объект Lock . Идея состоит в том, что когда поток хочет получить доступ к определенному ресурсу, он получает блокировку для этого ресурса. Как только поток блокирует определенный ресурс, никакой другой поток не может получить к нему доступ, пока не будет снята блокировка. В результате изменения ресурса будут атомарными, а условия гонки будут предотвращены.

Блокировка — это низкоуровневый примитив синхронизации, реализованный модулем __thread . В любой момент времени блокировка может находиться в одном из двух состояний: заблокирована или разблокирована. Поддерживает два метода:

  1. приобретать ()

    Когда состояние блокировки разблокировано, вызов метода acqu () изменит состояние на заблокированное и вернет его. Однако, если состояние заблокировано, вызов acqu () блокируется до тех пор, пока метод release () не будет вызван каким-либо другим потоком.

  2. релиз()

    Метод release () используется для установки состояния unlocked, т. Е. Для снятия блокировки. Он может быть вызван любым потоком, не обязательно тем, который получил блокировку.

Вот пример использования блокировок в ваших приложениях. Запустите ваш IDLE и введите следующее:

import threading
lock = threading.Lock()

def first_function():
    for i in range(5):
        lock.acquire()
        print ('lock acquired')
        print ('Executing the first funcion')
        lock.release()

def second_function():
    for i in range(5):
        lock.acquire()
        print ('lock acquired')
        print ('Executing the second funcion')
        lock.release()

if __name__=="__main__":
    thread_one = threading.Thread(target=first_function)
    thread_two = threading.Thread(target=second_function)

    thread_one.start()
    thread_two.start()

    thread_one.join()
    thread_two.join()

Теперь нажмите F5. Вы должны увидеть результат вроде этого:

ОБЪЯСНЕНИЕ КОДА

  1. Здесь вы просто создаете новую блокировку, вызывая функцию фабрики threading.Lock () . Внутренне, Lock () возвращает экземпляр наиболее эффективного конкретного класса Lock, который поддерживается платформой.
  2. В первом утверждении вы получаете блокировку, вызывая метод acqu (). Когда блокировка предоставлена, вы печатаете «блокировка получена» на консоли. Как только весь код, который вы хотите запустить, завершил выполнение, вы снимаете блокировку, вызывая метод release ().

Теория в порядке, но откуда вы знаете, что блокировка действительно работает? Если вы посмотрите на вывод, вы увидите, что каждый из операторов печати печатает ровно одну строку за раз. Напомним, что в более раннем примере выходные данные из print были случайными, потому что несколько потоков обращались к методу print () одновременно. Здесь функция печати вызывается только после получения блокировки. Таким образом, выходные данные отображаются по одному и построчно.

Помимо блокировок, python также поддерживает некоторые другие механизмы для обработки синхронизации потоков, как указано ниже:

  1. RLocks
  2. семафоры
  3. условия
  4. События и
  5. барьеры

Глобальная блокировка интерпретатора (и как с ней бороться)

Прежде чем углубляться в детали GIL-кода Python, давайте определим несколько терминов, которые будут полезны для понимания следующего раздела:

  1. Код с привязкой к ЦП: это относится к любому коду, который будет непосредственно выполняться ЦП.
  2. Код, связанный с вводом / выводом: это может быть любой код, который обращается к файловой системе через ОС
  3. CPython: это эталонная реализация Python и может быть описана как интерпретатор, написанный на C и Python (язык программирования).

Что такое GIL?

Блокировка может использоваться, чтобы убедиться, что только один поток имеет доступ к определенному ресурсу в данный момент времени.

Одной из особенностей Python является то, что он использует глобальную блокировку для каждого процесса интерпретатора, что означает, что каждый процесс рассматривает сам интерпретатор Python как ресурс.

Например, предположим, что вы написали программу на Python, которая использует два потока для выполнения операций как с процессором, так и с операциями ввода-вывода. Когда вы запускаете эту программу, вот что происходит:

  1. Интерпретатор Python создает новый процесс и порождает потоки
  2. Когда поток 1 начинает работать, он сначала получает GIL и блокирует его.
  3. Если поток 2 хочет выполнить сейчас, ему придется ждать освобождения GIL, даже если другой процессор свободен.
  4. Теперь предположим, что поток-1 ожидает операции ввода-вывода. В это время он выпустит GIL, а thread-2 получит его.
  5. После завершения операций ввода-вывода, если поток-1 хочет выполнить сейчас, ему снова придется ждать, пока поток-GIL не будет выпущен потоком-2.

Из-за этого только один поток может получить доступ к интерпретатору в любое время, что означает, что в определенный момент времени будет только один поток, выполняющий код Python.

Это нормально в одноядерном процессоре, потому что он будет использовать квантование времени (см. Первый раздел этого руководства) для обработки потоков. Однако в случае многоядерных процессоров связанная с процессором функция, выполняемая в нескольких потоках, окажет значительное влияние на эффективность программы, поскольку фактически она не будет использовать все доступные ядра одновременно.

Зачем нужен GIL?

Сборщик мусора CPython использует эффективную технику управления памятью, известную как подсчет ссылок. Вот как это работает: Каждый объект в Python имеет счетчик ссылок, который увеличивается, когда он присваивается новому имени переменной или добавляется в контейнер (например, кортежи, списки и т. Д.). Аналогично, счетчик ссылок уменьшается, когда ссылка выходит из области видимости или когда вызывается оператор del. Когда счетчик ссылок объекта достигает 0, он подвергается сборке мусора, и выделенная память освобождается.

Но проблема в том, что переменная подсчета ссылок склонна к условиям гонки, как и любая другая глобальная переменная. Чтобы решить эту проблему, разработчики python решили использовать глобальную блокировку интерпретатора. Другим вариантом было добавить блокировку к каждому объекту, что привело бы к взаимным блокировкам и увеличению накладных расходов от вызовов acqu () и release ().

Следовательно, GIL является существенным ограничением для многопоточных программ на Python, выполняющих тяжелые операции с привязкой к процессору (фактически делая их однопоточными). Если вы хотите использовать в своем приложении несколько процессорных ядер, используйте вместо этого многопроцессорный модуль.

Резюме
  • Python поддерживает 2 модуля для многопоточности:
    1. Модуль __thread : обеспечивает низкоуровневую реализацию для многопоточности и является устаревшим.
    2. Модуль потоков : обеспечивает высокоуровневую реализацию многопоточности и является текущим стандартом.
  • Чтобы создать поток с помощью модуля потоков, вы должны сделать следующее:
    1. Создайте класс, который расширяет класс Thread .
    2. Переопределите его конструктор (__init__).
    3. Переопределите его метод run () .
    4. Создайте объект этого класса.
  • Поток можно выполнить, вызвав метод start () .
  • Метод join () может использоваться для блокировки других потоков, пока этот поток (тот, в котором было вызвано соединение) не завершит выполнение.
  • Состояние гонки возникает, когда несколько потоков одновременно получают доступ или изменяют общий ресурс.
  • Этого можно избежать, синхронизируя потоки.
  • Python поддерживает 6 способов синхронизации потоков:
    1. Замки
    2. RLocks
    3. семафоры
    4. условия
    5. События и
    6. барьеры
  • Блокировки позволяют только определенному потоку, который получил блокировку, войти в критическую секцию.
  • У блокировки есть 2 основных метода:
    1. приобретать () : это устанавливает состояние блокировки в заблокированное. При вызове заблокированного объекта он блокируется до тех пор, пока ресурс не станет свободным.
    2. release () : устанавливает состояние блокировки как разблокированное и возвращает. Если вызывается для разблокированного объекта, он возвращает false.
  • Глобальная блокировка интерпретатора — это механизм, посредством которого одновременно может выполняться только 1 процесс интерпретатора CPython.
  • Он был использован для облегчения подсчета ссылок в сборщике мусора CPythons.
  • Чтобы создавать приложения Python с интенсивными операциями, связанными с процессором, вы должны использовать многопроцессорный модуль

 

Многопоточность в Python. Руководство для начинающих

Многопоточность является основной программирования программного обеспечения на языке высокого уровня. И многопоточность в Python, один из лучших примеров упрощенной реализации потоков.

В программировании программного обеспечения, поток является наименьшей единицей исполнения с независимым набором инструкций. Поток имеет начальную точку, последовательность выполнения, и результат. Он имеет указатель команд, который содержит текущее состояние нити и контролирует то, что выполняется в следующем порядке.

Аналогичным образом, способность процесса выполнять несколько потоков параллельно, называется многопоточностью. В идеале, многопоточный режим может значительно повысить производительность любой программы. И механизм многопоточности в Python очень удобен, которому вы можете быстро научиться.

В данной статье мы научим вас различным методам для создания потоков и осуществлять синхронизацию. Давайте сначала посмотрим на некоторые из основных преимуществ и недостатков многопоточности.

Многопоточность в Python – Плюсы:

  • Многопоточность может значительно улучшить скорость вычислений на многопроцессорных или многоядерных системах, так как каждый процессор или ядро ​​обрабатывает отдельный поток одновременно.
  • Многопоточность позволяет программе оставаться отзывчивой в то время как один поток ожидает ввода, другой работает как графический интерфейс. Это утверждение справедливо как для многопроцессорных или однопроцессорных систем.
  • Все нити процесса имеют доступ к своим глобальным переменным. Если глобальная переменная изменяется в одном потоке, то видима и для других потоков также. Поток может также иметь свои собственные локальные переменные.

Многопоточность в Python – Минусы:

  • На однопроцессорной системе, многопоточность не влияет на скорость вычислений. На самом деле, производительность системы может понизиться из-за накладных расходов на управление потоками.
  • Синхронизация необходима, чтобы избежать взаимного исключения при доступе к общим ресурсам процесса. Это непосредственно приводит к дополнительной памяти и загрузки процессора.
  • Многопоточность увеличивает сложность программы, таким образом, также делает ее трудным для отладки.
  • Повышает вероятность потенциальных тупиков.
  • Это может привести к голоданию, когда поток не получает регулярный доступ к совместно используемым ресурсам. Он потерпит неудачу, чтобы возобновить свою работу.

Ну до сих пор, вы читали теоретические представления о потоках. Если вы новичок в Python, мы бы предложили вам ознакомится с 10 советов быстрого кодирования в Python, которые могут помочь вам в написании кодов многопоточности в Python.

Модули многопоточности в Python для реализации нитей.

Python предлагает два модуля для реализации threads в программах.

  • модуль <thread>
  • модуль <threading>.

Для вашей информации,  модуль <thread> является устаревшим в Python 3 и переименован в модуль <_thread> для обратной совместимости. Но мы объясним оба метода, так как многие пользователи все еще используют унаследованные версии Python.

Основное различие между этими двумя модулями является то, что модуль <thread> реализует нить как функцию. С другой стороны, модуль <threading> предлагает объектно-ориентированный подход для обеспечения возможности создания потоков.

1- Как использовать модуль thread для создания потоков.

Если вы решите применить модуль <thread> в вашей программе, то необходимо использовать следующий метод порождения потоков.

Многопоточность в Python: использование модуля thread

thread.start_new_thread ( function, args[, kwargs] )

 

Этот метод является достаточно простым и эффективным способом создания потоков. Вы можете использовать его для запуска программ в Linux, так и Windows.

Этот метод запускает новый поток и возвращает его идентификатор. Он будет вызывать функцию, заданную в качестве параметра «function» с переданным списком аргументов. Когда <function> возвращает, поток выходит.

Здесь арг является кортеж аргументов; используя пустой кортеж для вызова <function> без каких – либо аргументов. Необязательный параметр < kwargs > определяет словарь ключевых аргументов.

Если <function> завершается с необработанным исключением, печатается трассирование, а затем выходит нить (Это не влияет на другие потоки, они продолжают работать). Используйте приведенный ниже код, чтобы узнать больше о многопоточности.

Основной пример многопоточности в Python.

Многопоточность в Python: вызов функции факториала из потока.

#Пример многопоточности в Python.
#1. Вычислить факториал с помощью рекурсии.
#2. Вызов функции факториала с помощью thread.
 
from thread import start_new_thread
 
threadId = 1
 
def factorial(n):
   global threadId
   if n < 1:   # базовый вариант
       print "%s: %d" % ("Нить", threadId )
       threadId += 1
       return 1
   else:
       returnNumber = n * factorial( n - 1 )  # рекурсивный вызов
       print(str(n) + '! = ' + str(returnNumber))
       return returnNumber
 
start_new_thread(factorial,(5, ))
start_new_thread(factorial,(4, ))
 
c = raw_input("Ждем поток для возврата...\n")

 

После выполнения программы, вывод будет следующим.

Вывод программы.

Многопоточность в Python: вывод в многопоточных программах.

# Многопоточность в Python: вывод программы -
Ждем поток для возврата...
Нить: 1
1! = 1
2! = 2
3! = 6
4! = 24
Нить: 2
1! = 1
2! = 2
3! = 6
4! = 24
5! = 120

 

2 – Как использовать модуль threading для создания темы.

Последний модуль <threading> предоставляет богатые возможности и большую поддержку потоков, чем унаследованный модуль <thread>, рассмотренный в предыдущем разделе. Модуль <threading> является отличным примером многопоточности в Python.

Модуль <threading> объединяет все методы модуля <thread> и предоставляет несколько дополнительных методов.

  • threading.activeCount(): находит общее количество активных объектов thread.
  • threading.currentThread(): вы можете использовать его, чтобы определить количество объектов thread под контролем потока вызывающего потока.
  • threading.enumerate(): даст вам полный список объектов нитей, которые активны в настоящее время.

Помимо перечисленных выше методов, модуль <threading> также представляет класс <Thread>, который вы можете попробовать для реализации thread. Это объектно-ориентированный вариант многопоточности в Python.

Класс <Thread> публикует следующие методы.

Методы класса Метод Описание
run(): Это функция точки входа для любого потока.
start(): Способ start() запускает поток, когда вызывается метод run.
join([time]): Метод join() позволяет программе ожидать оканчиваются.
isAlive(): Метод isAlive() проверяет активную нить.
getName(): Метод getName() возвращает имя потока.
setName(): Метод setName() обновляет имя потока.

 

2.1- Шаги для реализации темы с помощью модуля threading.

Вы можете выполнить следующие шаги, чтобы реализовать новый поток, используя модуль <threading>

  • Построить подкласс от класса <Thread>.
  • Переопределение <__init__(self [,args])> как способ поставки аргументов в соответствии с требованиями.
  • Затем переопределить метод <run(self [,args])> для кодирования бизнес-логики потока.

После того, как вы определите новый подкласс <Thread>, вы должны создать экземпляр, чтобы начать новый поток. Затем, вызовите метод <start()>, чтобы инициировать его. Это в конечном счете вызовет метод <run()> для выполнения бизнес – логики.

Пример – Создание класса потоков и объектов для печати текущей даты.

Многопоточность в Python: пример кода для отображения текущей даты в потоке.

# Многопоточность в Python: пример кода для отображения текущей даты в потоке.
#1. Определить подкласс, используя класс thread.
#2. Создать экземпляр подкласса и вызвет поток.
 
import threading
import datetime
 
class myThread (threading.Thread):
    def __init__(self, name, counter):
        threading.Thread.__init__(self)
        self.threadID = counter
        self.name = name
        self.counter = counter
    def run(self):
        print "Запуск " + self.name
        print_date(self.name, self.counter)
        print "Выход " + self.name
 
def print_date(threadName, counter):
    datefields = []
    today = datetime.date.today()
    datefields.append(today)
    print "%s[%d]: %s" % ( threadName, counter, datefields[0] )
 
# Создание новой нити
thread1 = myThread("Нить", 1)
thread2 = myThread("Нить", 2)
 
# Запуск новой нити
thread1.start()
thread2.start()
 
thread1.join()
thread2.join()
print "Выход из программы!!!"

 

2.2- Python Многопоточность – Синхронизация потоков.

Модуль <threading> имеет встроенные функциональные возможности для осуществления блокировки, что позволяет синхронизировать потоки. Блокировка требуется для управления доступом к общим ресурсам для предотвращения повреждения или непринятых данных.

Вы можете вызвать метод Lock(), чтобы применять блокировки, он возвращает новый объект блокировки. Затем, вы можете вызвать метод acquire(blocking) объекта блокировки для обеспечения синхронного выполнения потоков.

Необязательный параметр blocking указывает, ожидает ли нить получения блокировки.

  • В случае, если blocking устанавливается равным нулю, то поток сразу же возвращается с нулевым значением, если блокировка не может быть получена и с 1, если блокировка была приобретена.
  • В случае, если blocking устанавливается в 1, поток блоков ожидает, пока замок освободиться блокировка.

Метод release() объекта блокировки используется для снятия блокировки, когда он больше не требуется.

Просто для информации, структуры, построенные в Python, такие как списки, словари потокобезопасны как побочный эффект, имеющие атомные байты – код для манипулирования ими. Другие структуры данных, реализованные в Python или основных типов, таких как целые и вещественные числа, не имеют такой защиты. Для защиты от одновременного доступа к объекту, мы используем объект Lock.

Пример многопоточности в Python для демонстрации блокировки.

Многопоточность в Python: пример кода, демонстрации блокировки.

# Многопоточность в Python: пример кода, демонстрации блокировки.
#1. Определить подкласс, используя класс thread.
#2. Создать экземпляр подкласса и вызвать поток.
#3. Реализуем locks в потоке методом run.
 
import threading
import datetime
 
exitFlag = 0
 
class myThread (threading.Thread):
    def __init__(self, name, counter):
        threading.Thread.__init__(self)
        self.threadID = counter
        self.name = name
        self.counter = counter
    def run(self):
        print "Запуск " + self.name
        # Получить lock для синхронизации потоков
        threadLock.acquire()
        print_date(self.name, self.counter)
        # Фиксатор для следующего потока
        threadLock.release()
        print "Выход " + self.name
 
def print_date(threadName, counter):
    datefields = []
    today = datetime.date.today()
    datefields.append(today)
    print "%s[%d]: %s" % ( threadName, counter, datefields[0] )
 
threadLock = threading.Lock()
threads = []
 
# Создание нового потока
thread1 = myThread("Нить", 1)
thread2 = myThread("Нить", 2)
 
# Запуск нового потока
thread1.start()
thread2.start()
 
# Добавлять потоки в список нитей
threads.append(thread1)
threads.append(thread2)
 
# Ждать для всех потоков, чтобы завершить
for t in threads:
    t.join()
print "Выход из программы!!!"

 

Основная информация – Многопоточность в Python. Руководство для начинающих

Мы хотим, чтобы вы нашли это руководство по многопоточности в Python очень интересным и интуитивно понятным. Кроме того, мы добавили несколько примеров в этом руководстве, что, несомненно, поможет вам поднять ваши навыки Python.

Мы также рекомендуем вам практиковать с новыми многопоточными заданиями для достижения лучших результатов, которые будут стимулировать ваши навыки программирования и знания Python.

Если вы нашли ошибку, пожалуйста, выделите фрагмент текста и нажмите Ctrl+Enter.

threading — Потоковый параллелизм — документация Python 3.8.6

Исходный код: Lib / threading.py


Этот модуль конструирует высокоуровневые поточные интерфейсы поверх нижнего
level _thread модуль. См. Также модуль очереди .

Изменено в версии 3.7: раньше этот модуль был необязательным, теперь он всегда доступен.

Примечание

Хотя они не перечислены ниже, имена camelCase , используемые для некоторых
методы и функции этого модуля в Python 2.серия x все еще
поддерживается этим модулем.

Этот модуль определяет следующие функции:

нарезание резьбы. active_count ()

Возвращает количество активных объектов Thread . Вернувшийся
count равно длине списка, возвращаемого функцией enumerate () .

нарезание резьбы. current_thread ()

Вернуть текущий объект Thread , соответствующий потоку вызывающего
контроля.Если поток управления вызывающего абонента не был создан через
threading , фиктивный объект потока с ограниченной функциональностью
вернулся.

нарезание резьбы. за исключением крючка ( args , /)

Обрабатывать неперехваченное исключение, вызванное Thread.run () .

Аргумент args имеет следующие атрибуты:

  • exc_type : тип исключения.

  • exc_value : значение исключения, может быть Нет .

  • exc_traceback : Отслеживание исключения, может быть Нет .

  • поток : поток, вызвавший исключение, может быть Нет .

Если exc_type SystemExit , исключение автоматически игнорируется.
В противном случае исключение распечатывается на sys.stderr .

Если эта функция вызывает исключение, вызывается sys.excepthook () для
справиться.

threading.excepthook () можно переопределить, чтобы контролировать, насколько неперехваченным
обрабатываются исключения, вызванные Thread.run () .

Сохранение exc_value с использованием настраиваемого обработчика может создать ссылочный цикл. Это
должен быть очищен явно, чтобы прервать контрольный цикл, когда
исключение больше не нужно.

Сохранение нити с использованием настраиваемого крючка может воскресить ее, если она установлена ​​на
объект, который дорабатывается.Избегайте хранения нити после пользовательского
крючок завершается, чтобы не воскрешать объекты.

нарезание резьбы. get_ident ()

Вернуть «идентификатор потока» текущего потока. Это ненулевой
целое число. Его значение не имеет прямого значения; он задуман как волшебное печенье
для использования например для индексации словаря данных, специфичных для потока. Нить
идентификаторы могут быть повторно использованы, когда поток завершается, а другой поток
создан.

нарезание резьбы. get_native_id ()

Возвращает собственный интегральный идентификатор потока текущего потока, назначенного ядром.
Это целое неотрицательное число.
Его значение может использоваться для однозначной идентификации этого конкретного потока в масштабе всей системы.
(пока поток не завершится, после чего значение может быть переработано ОС).

Доступность: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX.

нарезание резьбы. перечислить ()

Возвращает список всех Thread объектов, которые в настоящее время активны. Список
включает демонические потоки, объекты фиктивных потоков, созданные
current_thread () и основной поток. Исключает завершенные потоки
и темы, которые еще не были запущены.

нарезание резьбы. main_thread ()

Вернуть основной объект Thread . В нормальных условиях
основной поток — это поток, из которого интерпретатор Python был
начал.

нарезание резьбы. settrace ( функция )

Установите функцию трассировки для всех потоков, запущенных из модуля threading .
Функция func будет передана в sys.settrace () для каждого потока, прежде чем его
run () вызывается метод .

нарезание резьбы. setprofile ( func )

Установите функцию профиля для всех потоков, запускаемых из модуля threading .Функция func будет передана sys.setprofile () для каждого потока, прежде чем его
run () вызывается метод .

нарезание резьбы. stack_size ([ размер ])

.

17,1. threading — параллелизм на основе потоков — документация Python 3.4.10

Исходный код: Lib / threading.py


Этот модуль конструирует высокоуровневые поточные интерфейсы поверх нижнего
level _thread модуль. См. Также модуль очереди.

Модуль dummy_threading предназначен для ситуаций, когда
использование потоковой передачи невозможно, поскольку отсутствует _thread.

Примечание

Хотя они не перечислены ниже, имена camelCase, используемые для некоторых
методы и функции этого модуля в Python 2.серия x все еще
поддерживается этим модулем.

Этот модуль определяет следующие функции:

threading.active_count ()

Возвращает количество активных в настоящее время объектов Thread. Вернувшийся
count равен длине списка, возвращаемого enumerate ().

threading.current_thread ()

Вернуть текущий объект Thread, соответствующий потоку вызывающего
контроля. Если поток управления вызывающего абонента не был создан через
модуль threading, фиктивный объект потока с ограниченной функциональностью
вернулся.

threading.get_ident ()

Вернуть «идентификатор потока» текущего потока. Это ненулевой
целое число. Его значение не имеет прямого значения; он задуман как волшебное печенье
для использования например для индексации словаря данных, специфичных для потока. Нить
идентификаторы могут быть повторно использованы, когда поток завершается, а другой поток
создан.

threading.enumerate ()

Вернуть список всех живых объектов Thread.Список
включает демонические потоки, объекты фиктивных потоков, созданные
current_thread () и основной поток. Исключает завершенные потоки
и темы, которые еще не были запущены.

threading.main_thread ()

Вернуть основной объект Thread. В нормальных условиях
основной поток — это поток, из которого интерпретатор Python был
начал.

threading.settrace ( func )

Установите функцию трассировки для всех потоков, запущенных из модуля threading. func будет передан в sys.settrace () для каждого потока перед его
вызывается метод run ().

threading.setprofile ( func )

Установите функцию профиля для всех потоков, запускаемых из модуля нарезания резьбы.
func будет передан в sys.setprofile () для каждого потока перед его
вызывается метод run ().

threading.stack_size ([ размер ])

Возвращает размер стека потоков, использованный при создании новых потоков.Необязательный
size аргумент определяет размер стека, который будет использоваться для последующего создания
потоков и должен быть 0 (использовать платформу или настроен по умолчанию) или положительный
целочисленное значение не менее 32 768 (32 КБ). Если размер не указан,
0 используется. Если изменение размера стека потоков
неподдерживаемый, возникает ошибка RuntimeError. Если указанный размер стека
недопустим, возникает ошибка ValueError и размер стека не изменяется. 32 КБ
в настоящее время является минимальным поддерживаемым значением размера стека, чтобы гарантировать достаточное
пространство стека для самого интерпретатора.Обратите внимание, что некоторые платформы могут иметь
особые ограничения на значения размера стека, такие как требование
минимальный размер стека> 32 КиБ или требуется выделение, кратное системе
размер страницы памяти — дополнительную информацию см. в документации по платформе.
информации (обычно страницы 4 КиБ; использование кратного 4096 размера стека
предлагаемый подход при отсутствии более конкретной информации).
Доступность: Windows, системы с потоками POSIX.

Этот модуль также определяет следующую константу:

заправка.TIMEOUT_MAX

Максимальное значение, разрешенное для параметра тайм-аут функций блокировки
(Lock.acquire (), RLock.acquire (), Condition.wait () и т. Д.).
Указание тайм-аута больше этого значения вызовет
OverflowError.

Этот модуль определяет ряд классов, которые подробно описаны в разделах
ниже.

Дизайн этого модуля в общих чертах основан на потоковой модели Java. Однако,
где Java делает блокировки и условные переменные основным поведением каждого объекта,
они являются отдельными объектами в Python.Класс Python Thread поддерживает
подмножество поведения Java-класса Thread; в настоящее время нет
приоритеты, нет групп потоков, и потоки не могут быть уничтожены, остановлены,
приостановлено, возобновлено или прервано. Статические методы класса Thread Java,
при реализации отображаются в функции уровня модуля.

Все методы, описанные ниже, выполняются автоматически.

17.1.1. Локальные данные потока

Локальные данные потока — это данные, значения которых зависят от потока.Управлять
локальных данных потока, просто создайте экземпляр локальных (или
подкласс) и сохраните на нем атрибуты:

 mydata = threading.local ()
mydata.x = 1
 

Значения экземпляра будут разными для разных потоков.

.

16,3. thread — несколько потоков управления — документация Python 2.7.18

Примечание

Модуль thread был переименован в _thread в Python 3.
Инструмент 2to3 автоматически адаптирует импорт при преобразовании вашего
исходники для Python 3; однако вам следует рассмотреть возможность использования высокоуровневого
threading модуль вместо этого.

Этот модуль предоставляет низкоуровневые примитивы для работы с несколькими потоками.
(также называемые легковесными процессами или задачами ) — несколько потоков
контролировать совместное использование своего глобального пространства данных.Для синхронизации простые замки
(также называемые мьютексами или двоичными семафорами ).
Модуль threading обеспечивает более простой в использовании и более высокий уровень
API потоковой передачи, построенный поверх этого модуля.

Модуль не является обязательным. Поддерживается в Windows, Linux, SGI IRIX, Solaris.
2.x, а также в системах с потоком POSIX (также известным как «pthread»).
реализация. Для систем, в которых отсутствует модуль thread ,
dummy_thread Доступен модуль .Он дублирует интерфейс этого модуля
и может использоваться в качестве замены.

Он определяет следующие константы и функции:

исключение поток. ошибка

Возникает при ошибках, связанных с потоком.

нить. LockType

Это тип объектов блокировки.

нить. start_new_thread ( функция , args [, kwargs ])

Запустить новый поток и вернуть его идентификатор.Поток выполняет функцию
функция со списком аргументов args (который должен быть кортежем). Необязательный
kwargs аргумент определяет словарь аргументов ключевого слова. Когда функция
возвращается, поток тихо завершается. Когда функция завершается
необработанное исключение, печатается трассировка стека, а затем поток завершается (но
другие потоки продолжают работать).

нить. главное_ прерывание ()

Вызывает исключение KeyboardInterrupt в основном потоке.Подрезка может
используйте эту функцию, чтобы прервать основной поток.

нить. выход ()

Вызов исключения SystemExit . Если не поймать, это вызовет
поток для тихого выхода.

нить. allocate_lock ()

Вернуть новый объект блокировки. Ниже описаны способы блокировки. Замок
изначально разблокирован.

нить. get_ident ()

Вернуть «идентификатор потока» текущего потока. Это ненулевой
целое число. Его значение не имеет прямого значения; он предназначен как волшебное печенье для
использоваться, например, для индексации словаря данных, специфичных для потока. Идентификаторы потоков
может быть повторно использован при выходе из потока и создании другого потока.

нить. stack_size ([ размер ])

Возвращает размер стека потоков, использованный при создании новых потоков.Необязательный
размер аргумент определяет размер стека, который будет использоваться для последующего создания
потоков и должен быть 0 (использовать платформу или настроен по умолчанию) или положительный
целочисленное значение не менее 32 768 (32 КБ). Если размер не указан,
0 используется. Если изменение размера стека потоков
неподдерживаемый, возникает ошибка исключение . Если указанный размер стека
недопустимый, возникает ValueError и размер стека не изменяется. 32 КБ
в настоящее время является минимальным поддерживаемым значением размера стека, чтобы гарантировать достаточное
пространство стека для самого интерпретатора.Обратите внимание, что некоторые платформы могут иметь
особые ограничения на значения размера стека, такие как требование
минимальный размер стека> 32 КБ или требующий распределения в несколько раз больше системы
размер страницы памяти — дополнительную информацию см. в документации по платформе.
информации (обычно страницы размером 4 КБ; использование кратного 4096 размера стека
предлагаемый подход при отсутствии более конкретной информации).
Доступность: Windows, системы с потоками POSIX.

Объекты блокировки имеют следующие методы:

замок. получить ([ waitflag ])

Без необязательного аргумента этот метод получает блокировку безоговорочно, если
необходимо ждать, пока он не будет выпущен другим потоком (только один поток в
время может запереться — вот причина их существования). Если целое число
waitflag присутствует аргумент, действие зависит от его значения: если он равен нулю,
блокировка приобретается только в том случае, если ее можно получить немедленно, не дожидаясь,
в то время как, если оно не равно нулю, блокировка устанавливается безоговорочно, как и раньше.В
возвращаемое значение — Истинно , если блокировка установлена ​​успешно, Ложь в противном случае.

замок. выпуск ()

Открывает блокировку. Замок должен быть приобретен ранее, но не
обязательно по той же теме.

замок. заблокировано ()

Вернуть статус блокировки: Истина , если она была получена каким-то потоком,
Неверно , если нет.

В дополнение к этим методам, объекты блокировки также можно использовать через
с заявлением , например:

 импортная ветка

a_lock = thread.allocate_lock ()

с a_lock:
    print "a_lock заблокирован, пока это выполняется"
 

Предостережения:

.

_thread — API низкоуровневой обработки потоков — документация Python 3.8.6


Этот модуль предоставляет низкоуровневые примитивы для работы с несколькими потоками.
(также называется легких процессов или задач ) — несколько потоков
контролировать совместное использование своего глобального пространства данных. Для синхронизации простые замки
(также называемые мьютексов или двоичных семафоров ).
Модуль threading обеспечивает более простой в использовании и более высокий уровень
API потоковой передачи, построенный поверх этого модуля.

Изменено в версии 3.7: раньше этот модуль был необязательным, теперь он всегда доступен.

Этот модуль определяет следующие константы и функции:

исключение _thread. ошибка

Возникает при ошибках, связанных с потоком.

Изменено в версии 3.3: Теперь это синоним встроенной ошибки RuntimeError .

_thread. Тип замка

Это тип объектов блокировки.

_thread. start_new_thread ( функция , args [, kwargs ])

Запустить новый поток и вернуть его идентификатор. Поток выполняет
function function со списком аргументов args (который должен быть кортежем).
Необязательный аргумент kwargs задает словарь аргументов ключевого слова.

Когда функция возвращается, поток автоматически завершается.

Когда функция завершается с необработанным исключением,
sys.unraisablehook () вызывается для обработки исключения. Объект
Атрибут аргумента ловушки — , функция . По умолчанию трассировка стека
напечатано, а затем поток завершается (но другие потоки продолжают выполняться).

Когда функция вызывает исключение SystemExit , она молча
игнорируется.

_thread. главное_ прерывание ()

Имитация эффекта сигнала .Сигнал SIGINT поступает в основную
нить. Поток может использовать эту функцию для прерывания основного потока.

Если сигнал signal.SIGINT не обрабатывается Python (было установлено значение
signal.SIG_DFL или signal.SIG_IGN ), эта функция выполняет
ничего такого.

_thread. выход ()

Вызывает исключение SystemExit . Если не поймать, это вызовет
поток для тихого выхода.

_thread. allocate_lock ()

Вернуть новый объект блокировки. Ниже описаны способы блокировки. Замок
изначально разблокирован.

_thread. get_ident ()

Вернуть «идентификатор потока» текущего потока. Это ненулевой
целое число. Его значение не имеет прямого значения; он предназначен как волшебное печенье для
использоваться, например, для индексации словаря данных, специфичных для потока. Идентификаторы потоков
может быть повторно использован при выходе из потока и создании другого потока.

_thread. get_native_id ()

Возвращает собственный интегральный идентификатор потока текущего потока, назначенного ядром.
Это целое неотрицательное число.
Его значение может использоваться для однозначной идентификации этого конкретного потока в масштабе всей системы.
(пока поток не завершится, после чего значение может быть переработано ОС).

Доступность: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX.

_thread. размер_стека ([ размер ])

Возвращает размер стека потоков, использованный при создании новых потоков. Необязательный
размер аргумент определяет размер стека, который будет использоваться для последующего создания
потоков и должен быть 0 (использовать платформу или настроен по умолчанию) или положительный
целочисленное значение не менее 32 768 (32 КБ). Если размер не указан,
0 используется. Если изменение размера стека потоков
неподдерживаемый, возникает ошибка RuntimeError . Если указанный размер стека
недопустимый, возникает ValueError и размер стека не изменяется.32 КБ
в настоящее время является минимальным поддерживаемым значением размера стека, чтобы гарантировать достаточное
пространство стека для самого интерпретатора. Обратите внимание, что некоторые платформы могут иметь
особые ограничения на значения размера стека, такие как требование
минимальный размер стека> 32 КиБ или требуется выделение, кратное системе
размер страницы памяти — дополнительную информацию см. в документации по платформе.
информации (обычно страницы 4 КиБ; использование кратного 4096 размера стека
предлагаемый подход при отсутствии более конкретной информации).

Доступность: Windows, системы с потоками POSIX.

_thread. TIMEOUT_MAX

Максимальное допустимое значение параметра timeout
Заблокировать. Получить () . Указание тайм-аута больше этого значения приведет к
вызвать OverflowError .

Объекты блокировки имеют следующие методы:

замок. получить ( waitflag = 1 , timeout = -1 )

Без дополнительных аргументов этот метод получает блокировку безоговорочно, если
необходимо ждать, пока он не будет выпущен другим потоком (только один поток в
время может запереться — вот причина их существования).

Если присутствует целочисленный аргумент waitflag , действие зависит от его
значение: если оно равно нулю, блокировка будет получена, только если она может быть получена
немедленно, не дожидаясь, а если он ненулевой, блокировка будет
безусловно, как указано выше.

Если аргумент тайм-аут с плавающей запятой присутствует и положителен, он
указывает максимальное время ожидания в секундах перед возвратом. Отрицательный
тайм-аут аргумент указывает неограниченное ожидание. Вы не можете указать
таймаут , если waitflag равен нулю.

Возвращаемое значение — Истина , если блокировка получена успешно,
Неверно , если нет.

Изменено в версии 3.2: Параметр timeout является новым.

.

Добавить комментарий

Ваш адрес email не будет опубликован.

2022 © Все права защищены. Карта сайта