Разное

Python многопоточность: Еще раз о многопоточности и Python / Хабр

Содержание

Еще раз о многопоточности и Python / Хабр

Как известно, в основной реализации Питона CPython (python.org) используется Global Interpreter Lock (GIL). Эта штука позволяет одновременно запускать только один питоновский поток — остальные обязаны ждать переключения GIL на них.

Коллега Qualab недавно опубликовал на Хабре бойкую статью, предлагая новаторский подход: создавть по субинтерпретатору Питона на поток операционной системы, получая возможность запускать все наши субинтерпретаторы параллельно. Т.е. GIL как бы уже и не мешает совсем.

Идея свежая, но имеет один существенный недостаток — она не работает…


Позвольте мне сначала рассмотреть GIL чуть подробней, а потом перейдем к разбору ошибок автора.

GIL

Тезисно опишу существенные для рассмотрения детали GIL в реализации Python 3.2+ (более подробное изложение предмета можете найти тут).

Версия 3.2 выбрана для конкретики и сокращения объема изложения. Для 1.x и 2.x отличия незначительны.

  • GIL, как следует из названия — это объект синхронизации. Предназначен для блокирования одномоментного доступа к внутреннему состоянию Python из разных потоков.
  • Он может быть захвачен каким-либо потоком или оставаться свободным (незахваченным).
  • Одновременно захватить GIL может только один поток.
  • GIL один единственный на весь процесс, в котором выполняется Python. Еще раз подчеркну: GIL спрятан не в субинтерпретаторе или где-то еще — он реализован в виде набора static variables, общими для всего кода процесса.
  • С точки зрения GIL каждому потоку, выполняющему Python C API вызовы, должна соответствовать структура PyThreadState. GIL указывает на один из PyThreadState (работающий) или не указывает ни на что (GIL отпущен, потоки работают независимо и параллельно).
  • После старта интерпретатора единственная операция, позволенная над Python C API при незахваченном GIL — это его захват. Всё остальное запрещено (технически безопасен также Py_INCREF, Py_DECREF может вызвать удаление объекта, что может вызвать бесконтрольное незащищенное одновременное изменение того самого внутреннего состояния Python, которое и пытается предотвратить GIL). В DEBUG сборке проверок на неправильную работу с GIL больше, в RELEASE часть отключена для повышения производительности.
  • Переключается GIL по таймеру (по умолчанию 5 мс) или явным вызовом (

    PyThreadState_Swap, PyEval_RestoreThread, PyEval_SaveThread, PyGILState_Ensure, PyGILState_Release и т.д.)

Как видим, запускать одновременное параллельное выполнение кода можно, нельзя при этом делать вызовы Python C API (это касается выполнения кода написанного на питоне тоже, естественно).

При этом «нельзя» означает (особенно в RELEASE сборке, используемой всеми) что такое поведение нестабильно. Может и не сломаться сразу. Может на этой программе вообще работать замечательно, а при небольшом безобидном изменении выполняемого питоновского кода завершаться с segmentation fault и кучей побочных эффектов.

Почему субинтепретаторы не помогают

Что же делает коллега Qualab (ссылку на архив с кодом можете найти в его статье, исходник я продублировал на gist: gist.github.com/4680136)?

В главном потоке сразу же отпускается GIL через PyEval_SaveThread(). Главный поток больше с питоном не работает — он создает несколько рабочих потоков и ждет их завершения.

Рабочий поток захватывает GIL. Код вышел странноватым, но сейчас это не принципиально. Главное — GIL зажат у нас в кулаке.

И сразу же параллельное исполнение рабочих потоков превращается в последовательное. Можно было и не городить конструкцию с субинтерпретаторами — толку от них в нашем контексте ровно ноль, как и ожидалось.

Не знаю, почему автор этого не заметил сразу, до опубликования статьи. А потом долго упорствовал, предпочитая называть черное белым.

Вернуться к параллельному исполнению просто — нужно отпустить GIL. Но тогда нельзя будет работать с интерпретатором Питона.

Если всё же наплевать на запрет и вызывать Python C API без GIL — программа сломается, причем не обязательно прямо сразу и не факт что без неприятных побочных эффектов. Если хотите выстрелить себе в ногу особенно замысловатым способом — это ваш шанс.

Повторюсь опять: GIL один на весь процесс, не на интерпретатор-субинтерпретатор. Захват GIL означает, что все потоки выполняющие питоновский код приостановлены.

Заключение

Нравится GIL или не очень — он уже есть и я настоятельно рекомендую научиться правильно с ним работать.

  1. Либо захватываем GIL и вызываем функции Python C API.
  2. Или отпускаем его и делаем что хотим, но Питон трогать в этом режиме нельзя.
  3. Параллельная работа обеспечивается одновременным запуском нескольких процессов через multiprocessing или каким другим способом. Детали работы с процессами выходят за рамки этой статьи.

Правила простые, исключений и обходных лазеек нет.

sleep() задержка программы в Python

Знакома ли вам ситуация, когда программа Python должна выполняться не сразу? В большинстве случаев требуется, чтобы код запускался как можно скорее. Однако порой перед работой оптимальнее будет дать программе немного поспать.

Содержание статьи

В Python есть возможность вызвать функцию sleep() для симуляции задержки в выполнении программы. Быть может, вам нужно дождаться загрузки, скачивания или появления графического объекта на экране. Также может потребоваться сделать паузу между вызовами к веб API или запросами к базе данных. В таких случаях поможет добавление вызова функции sleep() в программу.

Главные аспекты данного руководства по вызову sleep() в Python:

Данная статья предназначена для разработчиков Python среднего уровня, что стремятся повысить свою квалификацию. Если это похоже на вас, приступим!

Вызов sleep() через time.sleep()

В Python есть встроенная поддержка для погружения программы в сон. У модуля time есть функция sleep(), что позволяет отсрочить выполнение вызываемого потока на указанное количество секунд.

Есть вопросы по Python?

На нашем форуме вы можете задать любой вопрос и получить ответ от всего нашего сообщества!

Telegram Чат & Канал

Вступите в наш дружный чат по Python и начните общение с единомышленниками! Станьте частью большого сообщества!

Паблик VK

Одно из самых больших сообществ по Python в социальной сети ВК. Видео уроки и книги для вас!


Далее дан пример использования time.sleep():

import time
time.sleep(3) # Сон в 3 секунды



import time

time.sleep(3) # Сон в 3 секунды

При запуске кода из консоли, задержку нужно проводить перед вводом нового оператора в REPL.

На заметку: В Python 3.5 разработчики слегка изменили поведение time.sleep(). Благодаря новой системе вызова sleep() эффект отсрочки будет длиться как минимум на продолжении указанного количества секунд, даже в том случае, если сон прерывается сигналом. Однако, это не касается случаев, если сигнал является признаком вызова исключения.

Вы можете протестировать, как долго продлиться сон с помощью модуля Python timeit:

$ python3 -m timeit -n 3 «import time; time.sleep(3)»
3 loops, best of 3: 3 sec per loop



$ python3 -m timeit -n 3 «import time; time.sleep(3)»

3 loops, best of 3: 3 sec per loop

Здесь модуль timeit запускается с параметром -n, что указывает timeit, сколько раз выполнять последующий оператор. Можно заметить, что timeit выполнил оператор 3 раза, а лучшее время длилось 3 секунды, чего и следовало ожидать.

По умолчанию timeit будет запускать код миллион раз. Если бы вы запустили вышеуказанный код, оставив значение -n по умолчанию, тогда при 3 секундах на итерацию код завис бы примерно на 34 дня! У модуля timeit есть несколько других настроек для командной строки, с которыми можно ознакомиться в документации.

Создадим что-то более практичное. Системному администратору всегда нужно быть в курсе, если какой-то из сайтов упал. Вы бы хотели иметь возможность проверить код состояния сайта регулярно, но запрашивать веб сервер постоянно нельзя, ведь это сильно повлияет на производительность. В Python одним из простых способов совершить такую проверку является использование системного вызова sleep():

import time
import urllib.request
import urllib.error

def uptime_bot(url):
while True:
try:
conn = urllib.request.urlopen(url)
except urllib.error.HTTPError as e:
# Отправка admin / log
print(f’HTTPError: {e.code} для {url}’)
except urllib.error.URLError as e:
# Отправка admin / log
print(f’URLError: {e.code} для {url}’)
else:
# Сайт поднят
print(f'{url} поднят’)
time.sleep(60)

if __name__ == ‘__main__’:
url = ‘http://www.google.com/py’
uptime_bot(url)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

import time

import urllib.request

import urllib.error

 

def uptime_bot(url):

    while True:

        try:

            conn = urllib.request.urlopen(url)

        except urllib.error.HTTPError as e:

            # Отправка admin / log

            print(f’HTTPError: {e.code} для {url}’)

        except urllib.error.URLError as e:

            # Отправка admin / log

            print(f’URLError: {e.code} для {url}’)

        else:

            # Сайт поднят

            print(f'{url} поднят’)

        time.sleep(60)

 

if __name__ == ‘__main__’:

    url = ‘http://www.google.com/py’

    uptime_bot(url)

Здесь создается uptime_bot(), что принимает URL в качестве аргумента. Затем функция пытается открыть данный URL c urllib. При возникновении HTTPError или URLError программа перехватывает ошибку и выводит на экран. На практике вам, скорее всего, придется зафиксировать ошибку и отправить письмо веб-мастеру или системному администратору.

Если ошибок нет, код спокойно выполняется. Вне зависимости от того, что произойдет, программа уходит в сон на 60 секунд. Это значит, что доступ к сайту будет раз за минуту. URL, используемый в примере, содержит ошибки. Ежеминутный вывод на консоли выглядит следующим образом:

HTTPError: 404 для http://www.google.com/py

Потоки в Python — одна из самых обсуждаемых вещей

1 934

 

Рассмотрим, что такое потоки и почему многопоточность в питоне — бессмысленная трата времени

Многопоточность  — нет, мультипроцессорность — да

Есть процесс с общими данными — внутри которого инициализированы потоки. Потоки могут быть созданы главным процессом, но выполняют каждый свою задачу. Python использует внутреннюю глобальную блокировку интерпретатора (Global Interpreter Lock, GIL). Из-за этого в каждый конкретный момент времени может работать только один поток. Вследствие этого программы на Python могут выполняться только на одном процессоре, независимо от их количества в системе. Блокировка GIL делает неэффективной реализацию в виде многопоточного приложения.

Наличие дополнительных процессоров практически не дает преимуществ программе, которая большую часть времени проводит в ожидании событий.

Реализовать многопоточность в питоне, так или иначе, можно, но из-за GIL запуск на нескольких процессоров всё равно не будет осуществлен — все потоки будут работать только на одном процессоре — это не даст никаких преимуществ

Модуль threading

Класс Thread из модуля threading позволяет запускать программу в отдельном потоке. Так выглядит его простейшая реализация:

[code]

from threading import Thread
import time

def clock(interval):
while True:
print(«Текущее время: %s» % time.ctime())
time.sleep(interval)

t = Thread(target=clock, args=(15, ))
t.daemon = True
t.start()

[/code]

Разбираем пример.

1. аrgs — это кортеж, сюда передаем позиционные аргументы функции.

2.daemon — логический флаг, указывающий, будет ли поток демоническим, это означаетпрограмма (процесс) не завершится, пока работает хоть один поток внутри неё. Если назначить поток демоном, то он закроется вместе с закрытием программы. Его нужно прописывать до метода start

3. start — запуск потока на выполнение (внутри target — это название функции, которая будет выполняться)

4. join — указание, что мы будем знать о завершение потока. Без join главная программа будет завершаться сразу после завершения потока. Если мы хотим, чтобы приложение работало после завершения потока — нужно писать join.

 

Поток в виде класса

Когда мы определяем собственный класс потока, в котором переопределяется метод __init__(), важно вызвать конструктор базового класса super().__init__(),  иначе будет ошибка

[code]

from threading import Thread
import time

class ClockThread(Thread): #Создаем класс, которые наследуется от базового класса Thread

def __init__(self, interval):
super().__init__()
self.daemon = True
self.interval = interval

def run(self):
while True:
print(«Текущее время: %s» % time.ctime())
time.sleep(self.interval)

t = ClockThread(3)
t.start()
t.join()

[/code]

Результат:

Разбираем:

1. super().__init__ — мы вызываем конструктор родителя (Thread) базового класса, чтобы использовать все методы

2. start () запускает сам поток, а в run() мы запишем непосредственно блок кода для выполнения

События

 

Или другой пример выполнения поток по очереди с помощью событий. Событие — простейший объект синхронизации, являющийся, по сути, флажком. Поток может ожидать установки этого флажка, или устанавливать и сбрасывать его самостоятельно.

 

[code]

import threading

# x = выводимое значение,
# event_for_wait = событие, которое ожидается функция принимает на вход некий параметр, событие, которое ожидают
# event_for_set = событие для которого необходимо установить флаг True
def writer(x, event_for_wait, event_for_set):
for i in range(10):
# ожидаем некое событие
# поток-клиент ожидает установки флага
event_for_wait.wait()
# переводим флаг события на false
# поток-сервер может установить флаг или снять его
event_for_wait.clear()
# выводим параметр
print(x)
# для второго события ставим true — и потоки, которые ждали этого события начинают работать
event_for_set.set()

# Объекты класса Event модуля threading — объекты-наблюдатели
s1 = threading.Event()
s2 = threading.Event()

# Запускаем потоки
# в таргете у нас функция writer, а ей, как известно нужно передать три параметра
# Что мы будем выводить
# Событие на ожидание
# Событие для флага true
flow1 = threading.Thread(target=writer, args=(‘Здесь стартует первый поток’, s1, s2))
flow2 = threading.Thread(target=writer, args=(‘Здесь стартует второй поток’, s2, s1))

# запускаем потоки
flow1.start()
flow2.start()

# устанавливаем значение true для события (s1)
# потоки, которые этого ждут — пробуждаются
s1.set()

# ждем, пока завершатся запущенные потоки
# Ждем, пока поток не закончится. Это блокирует вызывающий пото

Использование multiprocessing в Python – CODE BLOG

Добрый день, уважаемые читатели . Очень часто в реальных проектах возникает необходимость выполнения определённых фрагментов программы на нескольких ядрах процессора. И сегодня я хочу обсудить реализацию такого приёма в программировании на Python, используя multiprocessing — модуль из стандартной библиотеки, впервые описанный в PEP371.

Подпишись на группу Вконтакте и Телеграм-канал. Там еще больше полезного контента для программистов.
А на YouTube-канале ты найдешь обучающие видео по программированию. Подписывайся!

Начнём с простого

Реализуем функцию, принимающую аргумент целочисленного типа и заставляющую «засыпать» программу на n секунд. Заранее импортируем multiprocessing под псевдонимом mp и time.

import multiprocessing as mp
from time import time, sleep

def f(n:int) -> None:
    sleep(n) 

Теперь запустим выполнение функции для чисел из отрезка [1, 4]. Заранее засечем время с помощью модуля time:

start = time()

for i in range(1, 1000001):
    square(i)

print(f"Script work time is {time() - start} s.") 

На моей машине этот фрагмент кода выполняется, как и полагается, за 10 секунд. Давайте научимся использоваться multiprocessing для таких целей.

if __name__ == '__main__':
    procs = list()
    start = time()

    for i in range(1, 5):
        proc = mp.Process(target=f, args=(i,))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()

    print(f"Script work time is {time() - start} s.")

Однако, с multiprocessing код выполнился за 4.14 секунды. Итак, давайте разбираться. Для каждого значения из нашего отрезка мы создаём экземляр класса mp.Process, конструктор которого принимает, в этом случае, два аргумента:

  • target — функция, которую необходимо выполнить
  • args — кортеж аргументов, которые необходимо передать в функцию

Далее с помощью метода start() мы запускаем работу процесса. Как вы могли заметить, мы не зря создали список, куда добавляли процессы по мере их создания. Для каждого Process из списка мы вызываем метод join(), и таким образом даём интерпретатору Python понять, что надо подождать, пока работа этого потока завершится.

Примечание. Не знаю как у вас, но у меня код отказывается выполняться без конструкции if __name__ == '__main__'. Именно поэтому я и добавил это в свою программу.

Замки в multiprocessing

Для того, чтобы не дать процессам конфликтовать, следует использовать mp.Lock. Использование этого класса очень просто: нам неоходимо использовать («повесить») замок перед выполнением какого-то фрагмента, а после снять его. Рассмотрим на практике:

def print_type(obj:object) -> None:
    print(f'Running with {current_process().name} \n')
    print(f'{obj} - {type(obj)}')

Так будет выглядеть наша функция без замыканий. Немного модернизируем её:

def print_type(obj:object, lock:mp.Lock) -> None:
    lock.release()
    print(f'Running with {mp.current_process().name} \n')
    print(f'{obj} - {type(obj)}')
    lock.acquire()

Вот и всё, так просто. Здесь же я хочу обсудить функцию current_procces(), которая возвращает процесс, который в данный момент выполняет функцию, а также про параметр name в mp.Process.

if __name__ == '__main__':
  procs = list()
  lock = mp.Lock()
  for i, obj in enumerate(['fff', 23, None, [1, 2, 3], (1, 2, 3), {1:2, 34}]):
    proc = mp.Process(target=print_type, args=(obj, lock), name=str(i))
    procs.append(proc)        
    proc.start()

  for proc in procs:
    proc.join() 

С этим разобрались. Теперь поговорим о Pool.

От map к Pool.map

Перейдём к более серьёзному примеру. Наша задача: определить кол-во чёрных пикселей в изображении ( (255, 255, 255) в формате RGB). Реализуем функцию, которая принимает в качестве аргумента путь к файлу, содержащему изображение (предположим, что черный пиксель имеет значения пикселей больше 250).

import numpy as np
from PIL import Image

def black_pixels(path:str) -> int:
    count = 0
    image = np.array(Image.open(path))

    return np.sum(image[::] > 250)

print(black_pixels('car.jpg')) 

Основы работы с потоками в Python @ 900913 — Цифровое наше всё

Перевод статьи Basic Threading in Python (автор Peyton McCullough), автор перевода Сергей Шилов

Если вы желаете, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Python может работать с потоками, но много разработчиков находят программирование потоков очень сложным. Среди прочего Peyton McCullough объяснит как порождать и уничтожать потоки в этом популярном языке.

Введение

Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр, так что, естественно, многие языки программирования поддерживают возможность работы с потоками. Python тоже входит в их число.

Однако, поддержка многопоточности в Python не обходится без ограничений и последствий, как писал Гвидо ван Россум:

«К несчастью, для большинства смертных программирование потоков просто Слишком Сложное, чтобы делать его правильно… Даже в Python — всякий раз, как кто-то всерьёз берётся за программирование потоков, на меня обрушиваются тонны сообщений об ошибках, причём если причиной половины из них действительно являются ошибки в интерпретаторе Python, то причина второй половины кроется в недостаточном понимании особенностей многопоточности…»

Прежде чем мы приступим к разбору кода, работающего с потоками, нам нужно рассмотреть наиболее важную вещь — глобальную блокировку интерпретатора (global interpreter lockGIL) Python. Если два или более потока попытаются манипулировать одним и тем же объектом в одно и то же время, то неизбежно возникнут проблемы. Глобальная блокировка интерпретатора исправляет это. В любой момент времени действия может выполнять только один поток. Python автоматически переключается между потоками, когда в этом возникает необходимость.

Использование модуля Threading

Модуль threading предоставляет нам простой способ работы с потоками. Его класс Thread может быть унаследован (subclassed) для создания потока или нескольких потоков. Метод run должен содержать код, который вы желаете выполнить при выполнении потока. Звучит просто, не так ли? Вот, посмотрите:

import threading
class MyThread(threading.Thread):
    def run(self):
        print 'Insert some thread stuff here.'
        print 'It'll be executed...yeah....'
        print 'There's not much to it.'

Выполнить поток также просто. Всё, что нам нужно сделать, это создать экземпляр нашего класса потока, после чего вызвать его метод start:

import threading
class MyThread(threading.Thread):

    def run(self):
        print 'You called my start method, yeah.'
        print 'Were you expecting something amazing?'

 MyThread().start()

Конечно, всего один поток это не бог весть что. Как и люди, потоки через некоторое время остаются в одиночестве. Давайте создадим группу потоков:

import threading
theVar = 1
class MyThread(threading.Thread):
    def run ( self ):
        global theVar
        print 'This is thread ' + str(theVar) + ' speaking.'
        print 'Hello and good bye.'
        theVar = theVar + 1

for x in xrange ( 20 ):
    MyThread().start()

Давайте теперь сделаем с помощью модуля threading нечто условно-полезное. Сервера часто используют потоки для работы в одно и то же время с несколькими клиентами. Давайте создадим простой, но расширяемый сервер. Когда клиент подключится к нему, сервер создаст новый поток для обслуживания этого клиента. Чтобы отправлять потоку данные клиента нам понадобится перекрыть метод __init__ класса Thread, чтобы он принимал параметры. Отнынесервер будет отправлять поток своей дорогой и ждать новых клиентов. Каждый поток будет посылать упакованный (pickled) объект соответствующему клиенту, после чего печатать не более десяти строк, полученных от клиента. (Упакованный объект в общем случае является объектом, уменьшенным до нескольких символов. Это полезно при сохранении объектов для последующего использования или для передачи объектов по сети).

import pickle
import socket
import threading

# We'll pickle a list of numbers:
someList = [1, 2, 7, 9, 0]
pickledList = pickle.dumps(someList)

# Our thread class:
class ClientThread(threading.Thread):

    # Override Thread's __init__ method to accept the parameters needed:
    def __init__(self, channel, details):
        self.channel = channel
        self.details = details
        threading.Thread.__init__(self)

    def run(self):
        print 'Received connection:', self.details[0]
        self.channel.send(pickledList)
        for x in xrange(10):
            print self.channel.recv(1024)
        self.channel.close()
        print 'Closed connection:', self.details[0]

# Set up the server:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('', 2727))
server.listen(5)

# Have the server serve "forever":
while True:
    channel, details = server.accept()
    ClientThread(channel, details).start()

Теперь нам нужно создать клиента, который будет подключаться к серверу, получать от него упакованный объект, распаковывать (reconstructs) объект и, наконец, посылать десять сообщений и закрывать соединение:

import pickle
import socket

# Connect to the server:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('localhost', 2727))

# Retrieve and unpickle the list object:
print pickle.loads(client.recv(1024))

# Send some messages:
for x in xrange(10):
    client.send('Hey. ' + str(x) + 'n')

# Close the connection
client.close()

Конечно, приведённый выше клиент не в состоянии воспользоваться всеми преимуществами многопоточностинашего сервера. Клиент порождает только один поток, что делает многопоточность бессмысленной. Давайте добавим клиенту многопоточности, чтобы сделать всё более интересным. Каждый поток будет подключаться ксерверу и выполнять приведённый выше код:

import pickle
import socket
import threading

# Here's our thread:
class ConnectionThread(threading.Thread):
    def run(self):
        # Connect to the server:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect (('localhost', 2727))

        # Retrieve and unpickle the list object:
        print pickle.loads(client.recv(1024))

        # Send some messages:
        for x in xrange(10):
            client.send('Hey. ' + str(x) + 'n')

        # Close the connection
        client.close()

# Let's spawn a few threads:
for x in xrange(5):
    ConnectionThread().start()

Пулы потоков (pooling threads)

Важно помнить, что потоки не появляются мгновенно. Создание большого их числа может замедлить ваше приложение. Чтобы создать поток и, позднее, уничтожить его, требуется время. Потоки могут также потреблять много ценных системных ресурсов в больших приложениях. Эта проблема легко решается путём создания ограниченного числа потоков (set number of threads) (пула потоков) и назначения им новых задач, в общем, повторного их использования. Соединения будут приниматься и передаваться тому потоку, который раньше всех закончит работу с предыдущим клиентом.

Если вы по-прежнему не понимаете, сравните это с больницей. Скажем, у нас есть пятеро врачей. Это наши потоки. Пациенты (клиенты) приходят в больницу и, если врачи заняты, сидят в приёмном покое.

Очевидно, нам нужно нечто, что сможет передавать данные клиента в наши потоки, не вызывая при этом проблем (оно должно быть «потокобезопасным»). Модуль Queue Python делает это для нас. Клиентская информация сохраняется в объекте Queue, откуда потоки извлекают её по мере надобности.

Давайте переделаем наш сервер, чтобы оценить преимущества пула потоков:

import pickle
import Queue
import socket
import threading

# We'll pickle a list of numbers, yet again:
someList = [1, 2, 7, 9, 0]
pickledList = pickle.dumps(someList)

# A revised version of our thread class:
class ClientThread(threading.Thread):

# Note that we do not override Thread's __init__ method.
# The Queue module makes this not necessary.

    def run(self):
        # Have our thread serve "forever":
        while True:
            # Get a client out of the queue
            client = clientPool.get()

            # Check if we actually have an actual client in the client variable:
            if client != None:
                print 'Received connection:', client[1][0]
                client[0].send(pickledList)
                for x in xrange(10):
                    print client[0].recv(1024)
                client[0].close()
                print 'Closed connection:', client[1][0]

# Create our Queue:
clientPool = Queue.Queue(0)

# Start two threads:
for x in xrange(2):
    ClientThread().start()
# Set up the server:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('', 2727))
server.listen(5)

# Have the server serve "forever":
while True:
    clientPool.put(server.accept())

Как вы можете увидеть, он немного сложнее нашего предыдущего сервера, но не усложнён до полной непонятности. Для проверки этого сервера, так же, как и предыдущего, можно воспользоваться клиентом из предыдущего раздела.

Дополнительные хитрости

Работа с потоками не заключается только в их создании и уничтожении. Модуль (может, не модуль а класс?) Thread из модуля threading содержит ещё несколько методов, которые могут вам пригодиться. Первые два предназначены для именования потоков. Метод setName присваивает потоку имя, а метод getName возвращает имя потока:

import threading

class TestThread(threading.Thread):
    def run(self):
        print 'Hello, my name is', self.getName()

cazaril = TestThread()
cazaril.setName('Cazaril')
cazaril.start()

ista = TestThread()
ista.setName('Ista')
ista.start()

TestThread().start()

Ничего удивительного. Также, как вы можете видеть, у потоков есть имена, даже если вы их не задавали.

Мы также можем проверить, является ли поток «живым», воспользовавшись методом isAlive. Если поток ещё не закончил выполняться, независимо от того, что происходит в его методе run, то он классифицируется как «живой»:

import threading
import time

class TestThread(threading.Thread):

    def run(self):
        print 'Patient: Doctor, am I going to die?'

class AnotherThread(TestThread):

    def run (self):
        TestThread.run(self)
        time.sleep(10)

dying = TestThread()
dying.start()
if dying.isAlive():
    print 'Doctor: No.'
else:
    print 'Doctor: Next!'

living = AnotherThread()
living.start()
if living.isAlive():
    print 'Doctor: No.'
else:
    print 'Doctor: Next!'

Второй поток остаётся в живых, поскольку мы заставили его ждать, воспользовавшись методом sleep модуля time.

Если нам нужно, чтобы поток дождался завершения другого потока, можно воспользоваться методом join:

import threading
import time

class ThreadOne(threading.Thread):

    def run(self):
        print 'Thread', self.getName(), 'started.'
        time.sleep ( 5 )
        print 'Thread', self.getName(), 'ended.'

class ThreadTwo(threading.Thread):

    def run(self):
        print 'Thread', self.getName(), 'started.'
        thingOne.join()
        print 'Thread', self.getName(), 'ended.'

thingOne = ThreadOne()
thingOne.start()
thingTwo = ThreadTwo()
thingTwo.start()

Мы также можем использовать метод setDaemon. Если при вызове в него передаётся значение True и другие потоки завершили своё исполнение, то из основной программы будет произведён выход, а поток продолжит работу:

import threading
import time

class DaemonThread(threading.Thread):

    def run(self):
        self.setDaemon(True)
        time.sleep(10)

DaemonThread().start()
print 'Leaving.'

Python также содержит модуль thread, работающий на более низком уровне, чем threading. Хочу обратить ваше внимание на одну особенность: это содержащаяся в нём функция start_new_thread. Используя её мы можем превратить обычную функцию в поток:

import thread

def thread(stuff):
    print "I'm a real boy!"
    print stuff

thread.start_new_thread(thread, ('Argument'))

Заключение

О многопоточности можно рассказать значительно больше, чем я сделал в этой статье, но я не буду пытаться объять необъятное. Кроме того, как упомянул Гвидо ван Россум, преимущества, которые даёт сложная многопоточность вPython могут быть сведены на нет последствиями. Однако, небольшая доза здравого смысла может устранить большинство проблем в простой многопоточности.

Многопоточность очень важна, когда дело касается компьютерных приложений и, как я упоминал раньше, Python её поддерживает. При условии правильного использования, эффект от применения потоков может быть очень благотворным и часто даже критическим, как я подчеркивал в этой статье.

DISCLAIMER

The content provided in this article is not warranted or guaranteed by Developer Shed, Inc. The content provided is intended for entertainment and/or educational purposes in order to introduce to the reader key ideas, concepts, and/or product reviews. As such it is incumbent upon the reader to employ real-world tactics for security and implementation of best practices. We are not liable for any negative consequences that may result from implementing any information covered in our articles or tutorials. If this is a hardware review, it is not recommended to open and/or modify your hardware.

модуль Queue. Перевод документации. Примеры работы с очередями

Модуль Queue реализует механизм очереди. Он особенно полезен в многопоточном программировании, когда необходимо безопасно передавать информацию между потоками. Класс Queue этого модуля реализует всю необходимую семантику блокировки.

Типы очередей

Модуль предоставляет реализации трех типов очередей, единственная разница которых это порядок получаемых значений.

class Queue.Queue(maxsize)

Класс реализующий очередь FIFO (First Input First Output — первым вошел, первым вышел). maxsize — параметр типа integer, который устанавливает предел для числа элементов, которые могут быть помещены в очередь. Вставка новых элементов блокируется, как только этот размер был достигнут, до тех пор пока элементы не будут удалены из очереди. Если значение параметра равно или меньше нуля, то очередь будет бесконечной.

class Queue.LifoQueue(maxsize)

Класс реализующий очередь LIFO, или по другому «стэк»(Last Input First Output — последним вошел, первым вышел). Параметр maxsize аналогичен параметру в классе Queue.Queue.

class Queue.PriorityQueue(maxsize)

Класс реализующий очередь с приоритетами. Параметр maxsize аналогичен параметру в классе Queue.Queue.

Элементы, добавляемые в подобную очередь должны представлять из себя кортеж типа (значение приоритета, данные). Первыми из очереди забираются элементы с меньшим приоритетом, полученным с помощью функции sorted().

Исключения

В модуле определены следующие исключения:

exception Queue.Empty

Возбуждается когда вызывается не блокирующий метод get() (или get_nowait()) пустого объекта Queue.

exception Queue.Full

Возбуждается когда вызывается не блокирующий метод put() (или put_nowait()) заполненного объекта Queue.

Методы объектов

Классы Queue, LifoQueue, PriorityQueue предоставляют следующие методы:

Queue.qsize()

Возвращаем аппроксимированный размер очереди. Однако, qsize()>0 не гарантирует, что последующий get() не будет блокирован и также qsize()<maxsize не будет гарантировать что put() не будет блокирован.

Queue.empty()

Возвращает True если очередь пуста, False в противном случае. Однако если метод возвращает True не гарантируется что следующий вызов put() не будет блокирован. Также если возвращает False не гарантируется что последующий get() не будет блокирован.

Queue.full()

Возвращает True если очередь заполнена, False в противном случае. Однако если метод возвращает True не гарантируется что следующий вызов get() не будет блокирован. Также если возвращает False не гарантируется что последующий put() не будет блокирован.

Queue.put(item, [block[, timeout]])

Помещает объект item в очередь. Если block=True и timeout не задан (None по умолчанию), то при необходимости произойдет блокировка до тех пор пока в очереди не будет доступного места. Если timeout это положительное число, то произойдет блокировка на заданное число секунд и будет возбужденно исключение Queue.Full если в течении этого времени не освободится место в очереди. В другом случае (если block=False), метод помещает объект item в очередь если свободное место доступно немедленно, иначе генерирует исключение Queue.Full (параметр timeout в этом случае игнорируется).

Queue.put_nowait(item)

Эквивалент вызову put(item, False)

Queue.get([block, [timeout]])

Удалить и возвратить элемент из очереди. Если block=True и timeout не задан (None по умолчанию), произойдет блокировка до тех пор пока элемент не будет доступен. Если timeout это положительное число, то произойдет блокировка на заданное число секунд и будет возбужденно исключение Queue.Empty если в течении этого времени элемент не будет дсотупен. В другом случае (если block=False), метод возвращает элемент очереди немедленно, иначе генерирует исключение Queue.Empty (параметр timeout в этом случае игнорируется).

Queue.get_nowait()

Эквивалент вызову get(False).

Следующие два метода предлагаются для поддержки и отслеживания были ли ставшие в очередь задачи полностью обработаны потребительскими потоками.

Queue.task_done()

Сообщает что ранее полученное задание из очереди выполнено. Используется потребительскими потоками. Для каждого вызова get() последующий вызов task_done() сообщает объекту очереди что обработка задания завершена.

Если количество вызовов task_done() превзойдет количество вызовов put(), то будет возбуждено исключение ValueError.

Queue.join()

Вызов блокировки до тех пор, пока все элементы из очереди не будут полученны и обработаны.

Количество не выполненных заданий возрастает тогда, когда добавляются элементы в очередь, а уменьшается когда потребительский поток вызывает task_done().

Если join() находится в заблокировано состоянии, то это означает, что количество вызванных put() не соответствует количеству вызванных task_done() и задания из очереди не обработаны. Блокировка снимется тогда когда количество вызовов сравняется.

Примеры

Все приведенные ниже примеры являются демонстрациями использования технологии очередей, в коде отстутвуют необходимые для реального приложения обработки ошибок и т.п. избыточные для примеров конструкции.

Пример 1

Основа примера была взята из официальной документации.

# -*- coding: utf-8 -*-
from Queue import Queue
from threading import Thread

# количество потоков обслуживающих очередь
num_worker_threads=2

def do_work(item):
    «»»
    Функция иммитирующая полезную работу
    «»»

    s=str(item)
    print s[::-1]

def worker():
    «»»
    Основной код здесь
    «»»

    while True:
        # Получаем задание из очереди
        item = q.get()
        do_work(item)
        # Сообщаем о выполненном задании
        q.task_done()

def source():
    «»»
    Функция генерирующая данные для очереди
    «»»

    for i in xrange(100, 105):
        yield i
    
# Создаем FIFO очередь
q = Queue()
# Создаем и запускаем потоки, которые будут обслуживать очередь
for i in range(num_worker_threads):
    t = Thread(target=worker)
    t.setDaemon(True)
    t.start()

# Заполняем очередь заданиями
for item in source():
    q.put(item)

# Ставим блокировку до тех пор пока не будут выполнены все задания
q.join()

Результат работы будет выглядеть следующим образом:

001
101
201
301
401

Пример 2

Данный пример иллюстрирует работу очереди бесконечной длины, в процессе обработки заданий которой существует возможность возникновения ошибки, которую необходимо разрешить. Для этого при возбуждении исключительной ситуации, поток обрабатывающий задание вновь помещает его в очередь, а сам принимается за обработку следующего задания из очереди. Пример снабжен богатым выводом отладочной информации для того что бы детально проследить за ходом выполнения кода программы (в тексте не приведен, выполните код самостоятельно для ознакомления с выводом).

# -*- coding: utf-8 -*-
import Queue, threading, time, random, datetime, sys

# переменная для имитации разовой ошибки
err=False

class Worker(threading.Thread):
    «»»
    Класс потока который будет брать задачи из очереди и выполнять их до успешного
    окончания или до исчерпания лимита попыток
    «»»
    def __init__(self, queue, output):
        # Обязательно инициализируем супер класс (класс родитель)
        super(Worker,self).__init__()
        # Устанавливаем поток в роли демона, это необходимо что бы по окончании выполнения
        # метода run() поток корректно завершил работу,а не остался висеть в ожидании
        self.setDaemon(True)
        # экземпляр класса содержит в себе очередь что бы при выполнении потока иметь к ней доступ
        self.queue=queue
        self.output=output
    
    def run(self):
        «»»
        Основной код выполнения потока должен находиться здесь
        «»»
        while True:
            try:
                # переменная для иммитации единичной ошибки во время выполнения потока
                global err
                # фиксируем время начала работы потока
                start=datetime.datetime.now().strftime(‘%H:%M:%S’)
                # запрашиваем из очереди объект
                target=self.queue.get(block=False)
                print ‘%s get target: %s’%(self.getName(), target)
                
                # эмулируем однократно возникающую ошибку
                if ((target==2) and (not err)):
                    err=True
                    raise Exception(‘test error’)
                
                # делаем видимость занятости потока
                # путем усыпления его на случайную величину
                sleep_time=random.randint(0,10)
                time.sleep(sleep_time)
                print ‘%s %s target: %s sleep %ss’%(start, self.getName(), target, sleep_time)
                # сообщаем о том что задача для полученного объекта из очереди выполнена
                self.output.put(target, block=False)
                self.queue.task_done()
            # После того как очередь опустеет будет сгенерировано исключение
            except Queue.Empty:
                sys.stderr.write(‘%s get Queue.EMPTY exception\r\n’%self.getName())
                break
            # если при выполнении потока будет сгенерировано исключение об ошибке,
            # то оно будет обработано ниже
            except Exception, e:
                self.queue.task_done()
                # выводим на экран имя потока и инфо об ошибке
                sys.stderr.write(‘%s get %s exception\r\n’%(self.getName(), e))
                # Предполагаем раз объект из очереди не был корреткно обработан,
                # то добавляем его в очередь
                self.queue.put(target, block=False)

class Test(object):
    def __init__(self, data, number_threads):
        # создаем экземпля класса очереди Queue
        self.queue=Queue.Queue()
        self.output=Queue.Queue()
        # заполняем очередь
        for item in data:
            self.queue.put(item)
        # определяем количество потоков которые будут обслуживать очередь
        self.NUMBER_THREADS=number_threads
        # список экземпляров класса потока, в последствии можно
        # обратиться к нему что бы получать сведения о состоянии потоков
        self.threads=[]
        
    def execute(self):
        # создаем экземпляра классов потоков и запускаем их
        for i in xrange(self.NUMBER_THREADS):
            self.threads.append(Worker(self.queue, self.output))
            self.threads[-1].start()
        
        # Блокируем выполнение кода до тех пор пока не будут выполнены все
        # элементы очереди. Это означает что сколкьо раз были добавлены элементы
        # очереди, то столько же раз должен быть вызван task_done().
        self.queue.join()

t=datetime.datetime.now()
test=Test(range(100), 20)
test.execute()
print ‘the end in %s’%(datetime.datetime.now()-t)
# вывод debug информации
print len(list(test.output.__dict__[‘queue’]))
print sorted(list(test.output.__dict__[‘queue’]))

Пример 3

Данный пример симулирует процесс выдачи зарплаты сотрудникам предприятия. Группа работников предприятия стоят в очереди за зарплатой. Особенность в том что ЗП выдают приоритетно занимаемой должности.

# -*- coding: utf-8 -*-
import Queue, threading, time, random, datetime

workers=[(0, u’Директор’),
         (1, u’Бухгалтер 1′),
         (1, u’Бухгалтер 2′),
         (50, u’Начальник IT отдела’),
         (60, u’Главный программист’),
         (70, u’Программист’),
         (75, u’Дизайнер’),
         (99, u’Уборщик’)]

# Перемешиваем список сотрудников в случайном порядке
random.shuffle(workers)

def worker(queue):
    «»»
    основной код находится здесь.
    «»»
    while True:
        # получаем задание из очереди в виде кортежа (приоритет, должность)
        job = queue.get()
        # Выводим на экран информацию о начале обслуживания сотрудника
        print u'[+]%s начал обслуживаться в %s’%(job[1], datetime.datetime.now().strftime(‘%H:%M:%S’))
        # Шутки ради, предполагаем что приоритет явно связан с размером зарплаты и исходя из этого, затраченному времени на выдачу денег.
        time.sleep((100-job[0])/10)
        # Выводим информацию об окончании обслуживания сотрудника
        print u'[-]%s кончил обслуживаться в %s’%(job[1], datetime.datetime.now().strftime(‘%H:%M:%S’))
        # Сообщаем что задание выполнено
        queue.task_done()

# Создаем приоритетную очередь и наполняем ее заданиями
q = Queue.PriorityQueue()
for item in workers:
    q.put(item)

# Создаем 2 потока которые будут обслуживать очередь.
for i in range(2):
    # В окнструктор поткоа передаем функцию кот

Использование потоков Python и возврат нескольких результатов (Учебное пособие)

Недавно у меня возникла проблема с длительным веб-процессом, который мне нужно было значительно ускорить из-за тайм-аутов. Задержка возникла из-за того, что системе нужно было получить данные с нескольких URL-адресов. Общее количество URL-адресов варьировалось от пользователя к пользователю, и время ответа для каждого URL-адреса было довольно большим (около 1,5 секунд).

Возникли проблемы с 10-15 URL-запросами, которые занимали более 20 секунд, и время ожидания HTTP-соединения моего сервера истекло.Вместо того, чтобы увеличивать время ожидания, я обратился к библиотеке потоков Python. Его легко освоить, быстро реализовать и очень быстро решить мою проблему. Система реализована на веб-микро-фреймворке Pythons Flask.

Использование потоков для небольшого количества задач

Распределение потоков в Python очень просто. Это позволяет вам управлять параллельными потоками, выполняющими работу одновременно. Библиотека называется «threading», вы создаете объекты «Thread», и они запускают для вас целевые функции.Вы можете запустить сотни потоков, которые будут работать параллельно. Первое решение было вдохновлено рядом сообщений StackOverflow и включает запуск отдельного потока для каждого запроса URL. Это оказалось не идеальным решением, но дает хорошую основу для обучения.

Сначала вам нужно определить «рабочую» функцию, которую каждый поток будет выполнять отдельно. В этом примере рабочая функция — это метод «обхода», который извлекает данные из URL-адреса. Возврат значений из потоков невозможен, и поэтому в этом примере мы передаем глобально доступный (для всех потоков) массив «результатов» с индексом массива, в котором будет храниться полученный результат.Функция crawl () будет выглядеть так:

 ...
импорт журнала
из urllib2 импортировать urlopen
from threading import Thread
из json импорта JSONDecoder
...

# Определите функцию сканирования, которая извлекает данные из URL и помещает результат в результаты [index]
# Список результатов будет содержать полученные данные
# Список 'urls' содержит все URL-адреса, которые необходимо проверить на наличие данных
results = [{} для x в URL-адресах]
def сканирование (URL, результат, индекс):
    # Держите все в цикле try / catch, чтобы мы обрабатывали ошибки
    пытаться:
        data = urlopen (url).читать()
        logging.info ("Запрошено ..." + url)
        результат [индекс] = данные
    Кроме:
        logging.error ('Ошибка при проверке URL!')
        результат [индекс] = {}
    вернуть True 

Чтобы фактически запустить потоки в python, мы используем библиотеку «threading» и создаем объекты «Thead». Мы можем указать целевую функцию («target») и набор аргументов («args») для каждого потока, и после запуска theads будут выполнять указанную функцию параллельно.В этом случае использование потоков эффективно сократит время поиска URL-адресов до 1,5 секунд (приблизительно) независимо от того, сколько URL-адресов нужно проверить. Код для запуска потоковых процессов:

 # создать список потоков
темы = []
# В этом случае urls - это список URL-адресов для сканирования.
для ii в диапазоне (len (urls)):
    # Мы запускаем один поток на каждый существующий URL.
    process = Thread (target = сканирование, args = [urls [ii], result, ii])
    process.start ()
    потоки.добавить (процесс)

# Теперь мы приостанавливаем выполнение основного потока, «присоединяя» все наши запущенные потоки.
# Это гарантирует, что каждый из них завершил обработку URL-адресов.
для процесса в потоках:
    process.join ()

# На этом этапе результаты для каждого URL теперь аккуратно хранятся по порядку в 'results' 

Единственная особенность — это функция join (). По сути, join () приостанавливает вызывающий поток (в данном случае основной поток программы) до тех пор, пока рассматриваемый поток не завершит обработку.Вызов соединения предотвращает выполнение нашей программы до тех пор, пока не будут получены все URL-адреса.

Этот метод запуска одного потока для каждой задачи будет работать хорошо, если у вас нет большого количества (многих сотен) задач для выполнения.

Использование очереди для большого количества задач

Описанное выше решение успешно работало для нас, при этом пользователям нашего веб-приложения требовалось в среднем 9-11 потоков на запрос. Потоки запускались, работали и успешно возвращали результаты.Проблемы возникли позже, когда пользователям требовалось гораздо больше потоковых процессов (> 400). С такими запросами Python запускал сотни потоков с ошибками, например:

Ошибка

: не удается запустить новый поток

Файл "https://shanelynnwebsite-mid9n9g1q9y8tt.netdna-ssl.com/usr/lib/python2.5/threading.py", строка 440, в начале
    _start_new_thread (самоблокировка .__, ())
 

Для этих пользователей исходное решение было нежизнеспособным. В вашей среде существует ограничение на максимальное количество потоков, которые может запускать Python.Еще одна из встроенных библиотек Pythons для многопоточности, Queue, может использоваться, чтобы обойти препятствие. Очередь в основном используется для хранения ряда «задач, которые необходимо выполнить». Потоки могут брать задачи из очереди, когда они доступны, выполнять работу, а затем возвращаться для выполнения других задач. В этом примере нам нужно было обеспечить одновременно не более 50 потоков, но возможность обрабатывать любое количество URL-запросов. Настроить очередь в Python очень просто:

 # Настройка очереди
...
из очереди импорта Очередь
...
# настроить очередь для хранения всех URL
q = очередь (maxsize = 0)
# Используйте много потоков (не более 50 или по одному для каждого URL)
num_theads = min (50, len (URL-адреса)) 

Чтобы вернуть результаты из потоков, мы будем использовать ту же технику передачи списка результатов вместе с индексом для хранения каждому рабочему потоку. Индекс должен быть включен в очередь при настройке задач, поскольку мы не будем явно вызывать каждую функцию «обхода» с аргументами (у нас также нет гарантии, в каком порядке выполняются задачи).

 # Заполнение очереди задачами
results = [{} для x в URL-адресах];
# загружаем очередь с URL-адресами для выборки и индексом для каждого задания (в виде кортежа):
для i в диапазоне (len (urls)):
    # нужен индекс и URL в каждом элементе очереди.
    q.put ((i, urls [i])) 

Функция потокового «обхода» будет отличаться, поскольку теперь она полагается на очередь. Потоки настроены на закрытие и возврат, когда в очереди нет задач.

 # Потоковая функция для обработки очереди.def crawl (q, результат):
    пока не q.empty ():
        work = q.get () # получить новую работу из очереди
        пытаться:
            data = urlopen (работа [1]). read ()
            logging.info ("Запрошено ..." + работа [1])
            result [work [0]] = data # Сохранить данные в правильном индексе
        Кроме:
            logging.error ('Ошибка при проверке URL!')
            результат [работа [0]] = {}
        # сообщить очереди, что задача обработана
        q.task_done ()
    вернуть True 

Новый объект Queue передается потокам вместе со списком для хранения результатов. Окончательное расположение для каждого результата содержится в задачах очереди — это гарантирует, что окончательный список «результатов» находится в том же порядке, что и исходный список «URL-адресов». Мы заполняем очередь этой информацией о вакансии:

 # Запуск рабочих потоков при обработке очереди
для i в диапазоне (num_theads):
    logging.debug ('Начальный поток', i)
    worker = Thread (цель = сканирование, args = (q, results))
    рабочий.setDaemon (True) # установка потоков как "демон" позволяет основной программе
                              # выйти, даже если они не завершаются
                              #правильно.
    worker.start ()

# теперь ждем пока будет обработана очередь
q.join ()

logging.info ('Все задачи выполнены.')
 

Наши задачи теперь будут обрабатываться не полностью параллельно, а 50 потоками, работающими параллельно. Следовательно, 100 URL-адресов займут 2 x 1.5 секунд прибл. Здесь такая задержка была приемлемой, поскольку количество пользователей, которым требуется более 50 потоков, минимально. Однако, по крайней мере, система достаточно гибкая, чтобы справиться с любой ситуацией.

Эта установка хорошо подходит для примера не требующей вычислений работы ввода / вывода (получение URL-адресов), поскольку большая часть времени потоков будет потрачена на ожидание данных. В работе с интенсивным использованием данных или науке о данных лучше подходят многопроцессорные библиотеки или библиотеки сельдерея, поскольку они распределяют работу между несколькими ядрами ЦП.Надеюсь, приведенный выше контент приведет вас на правильный путь!

Дополнительная информация о Python Threading

Если вам нужна более подробная информация, есть еще несколько полезных материалов по потокам и модулю потоковой передачи:

Учебное пособие по многопоточности Python — Параллельное программирование

В этом руководстве мы рассмотрим, как вы можете использовать многопоточность в своих приложениях Python.

Что такое многопоточность?

Современные компьютеры, как правило, имеют ЦП с несколькими ядрами обработки, каждое из которых
из этих ядер могут запускать множество потоков одновременно, что дает нам возможность
выполнять несколько задач одновременно.Надеюсь, это руководство покажет вам, как
начните с модуля Python threading .

Цели:

  1. Создание и запуск потоков
  2. Изучение ограничений реализации потоковой передачи Python

Создание потоков в Python

Для начала нам нужно создать новый файл и называть его worker.py, он будет содержать весь наш код для одного из наших потоков. Для начала мы собираемся создать класс на Python и импортировать и расширить модуль потоковой передачи.

  импортная резьба

класс Worker (threading.Thread):
    ## Наш конструктор рабочих, обратите внимание на метод super (), который жизненно важен, если мы хотим этого
    ## для правильной работы
    def __init __ (сам):
        super (Рабочий, сам) .__ init __ ()

    def run (self):
        для i в диапазоне (10):
           печать (я)
  

Теперь, когда у нас есть рабочий класс, мы можем начать работу над основным классом. Создайте новый файл python, назовите его main.py и введите следующий код:

  импортная резьба
из worker import Worker

def main ():
    ## Это инициализирует поток1 как экземпляр нашего рабочего потока
   thread1 = Рабочий ()
    ## Это код, необходимый для запуска нашего вновь созданного потока
    поток1.Начало()

  если __name__ == "__main__":
      основной()
  

Это весь код, необходимый для успешного создания и инстанцирования потока в python. Если вы можете запустить python через командную строку, откройте новый терминал в текущей папке и введите «python main.py». Надеюсь, вы увидите результат работы вышеуказанной программы, если ошибок не произойдет.

Задание:

Попробуйте создать больше потоков, создав новые объекты Worker (), а затем запустите их:

  thread1 = Рабочий (1)
    thread2 = Рабочий (2)
    thread3 = Рабочий (3)
    поток1.Начало()
    thread2.start ()
    thread3.start ()
  

Когда вы запустите это, вы должны увидеть результат, который выглядит примерно так: Обратите внимание, что выведенные числа не в порядке, это в основном показывает вам точный порядок, в котором потоки выполнили свои задачи, и показывает вам истинную мощность асинхронное программирование, несколько потоков выполняются параллельно.

Ограничение классических потоков Python

Одна из основных проблем классической реализации потоков Python заключается в том, что
они не являются действительно асинхронными.Проведение тестов на огромных наборах данных показывает, что
время выполнения потоков Python не полностью параллельно, и вы часто будете
найти время выполнения, увеличивающее добавление нескольких потоков в программы так часто
синхронное выполнение этих задач значительно сократит время выполнения. Этот
связано с тем, как Global Interpreter Lock (GIL) работает в Python, это в основном
гарантирует, что за один раз можно скомпилировать только одну строку кода Python.

Подробнее о GIL можно узнать
здесь: https: // wiki.python.org/moin/GlobalInterpreterLock


Python 线程 |菜鸟 教程

多 线程 类似于 同时 执行 多个 不同 程序 , 多 线程 运行 有 如下 优点 :

  • 线程 可以 把 占据 长时间 的 程序 中 的 任务 放到 后台 去 处理。
  • 界面 更加 , 这样 比如 点击 了 一个 按钮 去 触发 某些 事件 的 处理 , 弹出 一个 进度 显示 的 进度
  • 程序 的 运行 速度 可能 加快
  • 一些 等待 的 任务 实现 上 如 用户 输入 、 文件 读写 和 网络 收发 数据 等 比较 有用 了。 在 这种 情况 下 释放 一些 珍贵 内存 等等。

线程 在 执行 过程 中 与 进程 有 区别 的。 每个 独立 的 进程 有 运行 的 入口 、 顺序 执行 序列 和 的 出口。 但是 线程 不程序 提供 多个 线程 执行 控制。

他 的 一 CPU , 称为 线程 的 上下文 , 该 上下文 反映 线程 上次 运行 的

指令 指针 和 堆栈 指针 寄存器 线程 上下文 中 两个 最 重要 的 寄存器 , 总是 在 进程 得到 文 中 运行 的 , 用于 的 拥有 线程 的 地址 中 内存

  • 线程 可以 被 抢占 (中断)。
  • 在 其他 线程 正在 运行 时 , 线程 可以 暂时 搁置 (也 称为 睡眠) — 这 就是 线程 的 退让。

开始 学习 Python 线程

Python 中 使用 线程 有 两种 方式 : 函数 或者 用 类 来 包装 线程 对象。

式 : thread 模块 中 的 start_new_thread () 函数 来 产生 新 线程。 如下:

нить.start_new_thread (функция, аргументы [, kwargs])
 

说明:

  • функция — 线程 函数。
  • args — 给 线程 函数 的 参数, 他 必须 是 tuple 类型。
  • кваргов — 可选 参数。

实例 (Python 2.0+)

импортная ветка
время импорта

def print_time (threadName, delay):
count = 0
пока count <5: time.sleep (задержка) count + = 1 напечатайте "% s:% s"% (threadName, time.ctime (time.time ())) пытаться: thread.start_new_thread (print_time, ("Поток-1", 2,)) нить.start_new_thread (print_time, ("Поток-2", 4,)) Кроме: print «Ошибка: невозможно запустить поток» а 1: пройти

执行 以上 程序 输出 结果 如下 :

Thread-1: 22 января, четверг, 15:42:17, 2009 г.
Thread-1: 22 января, четверг, 15:42:19 2009 г.
Thread-2: 22 января, четверг, 15:42:19 2009 г.
Thread-1: 22 января, четверг, 15:42:21 2009 г.
Thread-2: 22 января, четверг, 15:42:23 2009 г.
Thread-1: 22 января, четверг, 15:42:23 2009 г.
Thread-1: 22 января, четверг, 15:42:25 2009 г.
Thread-2: 22 января, четверг, 15:42:27 2009 г.
Thread-2: 22 января, четверг, 15:42:31 2009 г.
Thread-2: 22 января, четверг, 15:42:35 2009 г.
 

的 结束 一般 依靠 线程 函数 的 自然 结束 ; 也 可以 在 线程 函数 中 调用 thread.exit () 他 抛出 исключение SystemExit , 达到 退出 线程 的 目的。


线程

Python 两个 标准 thread threading 提供 对 线程 的 支持 。thread 提供 了 低 级别 的 、 原始 的 线程 以及 一个 简单 的 锁。

нарезание резьбы 模块 提供 的 其他 方法 :

  • threading.currentThread (): 返回 当前 的 线程 变量。
  • threading.enumerate (): 一个 包含 正在 运行 的 线程 的 list。 正在 运行 指 线程 启动 前 , 不 包括 启动 前 和 终止 的 线程。
  • threading.activeCount (): 正在 运行 的 线程 数量 , 与 len (threading.enumerate ()) 有 相同 的 结果。

除了 使用 方法 外 , 线程 模块 同样 提供 Резьба 类 来 处理 线程 , Резьба 类 提供 了 以下 方法:

  • запуск (): 用以 表示 线程 活动 的 方法。
  • начало (): 启动 线程 活动。
  • join ([время]): 等待 至 中止。 这 阻塞 调用 线程 直至 的 join () 被 调用 中止 — 正常 退出 或者 抛出 的 异常 — 或者 的 超时 发生。
  • isAlive (): 返回 线程 是否 活动 的。
  • getName (): 返回 线程 名。
  • setName (): 设置 线程 名。

使用 Нарезание резьбы 模块 创建 线程

使用 Заправка 模块 创建 线程 , 直接 从 заправка.Тема 继承 , 然后 重写 __init__ 方法 和 запустить 方法 :

实例 (Python 2.0+)

импорт потоковой передачи
время импорта

exitFlag = 0

класс myThread (threading.Thread):
def __init __ (self, threadID, name, counter):
threading.Thread .__ init __ (сам)
self.threadID = threadID
self.name = имя
self.counter = counter
def run (self):
напечатать «Запуск» + self.name
print_time (self.name, self.counter, 5)
печать «Выход» + self.name

def print_time (имя потока, задержка, счетчик):
пока счетчик:
если exitFlag:
(заправка.Тема) .exit ()
time.sleep (задержка)
напечатайте «% s:% s»% (threadName, time.ctime (time.time ()))
счетчик — = 1

thread1 = myThread (1, «Поток-1», 1)
thread2 = myThread (2, «Поток-2», 2)

thread1.start ()
thread2.start ()

print «Выход из основного потока»

以上 程序 执行 结果 如下 ;

Начальный поток-1
Начальный поток-2
Выход из основного потока
Thread-1: 21 мар, 09:10:03 2013 г.
Thread-1: 21 мар, 09:10:04 2013 г.
Thread-2: 21 мар, 09:10:04 2013 г.
Thread-1: 21 мар, 09:10:05 2013 г.
Thread-1: 21 мар, 09:10:06 2013 г.
Тема-2: 21 мар, 09:10:06 2013 г.
Thread-1: 21 мар, 09:10:07 2013 г.
Выход из потока-1
Thread-2: 21 мар, 09:10:08 2013 г.
Thread-2: 21 мар, 09:10:10 2013 г.
Thread-2: 21 мар, 09:10:12 2013 г.
Выход из потока-2
 

线程

如果 多个 线程 共同 对 某个 数据 修改 , 出现 不可 预料 的 结果 , 为了 保证 的 正确性 , 需要 对 多个 线程 同步。

使用 Thread 对象 的 Lock 和 Rlock 可以 实现 简单 的 线程 同步 , 这 两个 对象 получить release 方法 , 那些 需要 每次 只 允许 一个 线程 的 数据 , 可以 将 其 получить 和 release 方法 之间。 如下 :

多 线程 的 优势 在于 可以 同时 运行 多个 任务 (至少 感觉 起来 是 这样)。 但是 当 线程 需要 数据 时 , 可能 存在 数据 同步 的 问题。

考虑 这样 一种 情况 : 一个 列表 里 所有 元素 都是 0 , 线程 «набор» 从 后 向前 把 所有 改成 1 , 而 线程 «печать» 负责 从前 往后 读取 列表 并 打印。

那么 , 可能 线程 «set» 开始 改 的 时候 , 线程 «print» 便 来 打印 列表 了 , 输出 成了 一半 一半 0 一半 1 , 这 就是 数据 的 不 这种 情况。

锁 有 两种 状态 —— 锁定 和 未 锁定。 每当 一个 线程 比如 «set» 要 访问 共享 数据 时 , 先 获得 锁定 ; 如果 已经 有 别的 线程 «print» 获得 锁定 了 , 那么 就让 线程 » набор «暂停 , 也 就是 同步 阻塞 ; 等到 线程» печать «访问 完毕 , 释放 锁 以后 , 再让» набор «继续。

这样 的 处理 , 打印 列表 时 要么 全部 输出 0 , 要么 全部 输出 1 , 不会 再 一半 0 1 的 尴尬 场面。

实例 (Python 2.0+)

импорт потоковой передачи
время импорта

класс myThread (threading.Thread):
def __init __ (self, threadID, name, counter):
threading.Thread .__ init __ (сам)
self.threadID = threadID
self.name = имя
self.counter = counter
def run (self):
напечатать «Запуск» + self.name

threadLock.acquire ()
print_time (self.name, self.counter, 3)

threadLock.release ()

def print_time (имя потока, задержка, счетчик):
пока счетчик:
время.сон (задержка)
напечатайте «% s:% s»% (threadName, time.ctime (time.time ()))
счетчик — = 1

threadLock = threading.Lock ()
темы = []

thread1 = myThread (1, «Поток-1», 1)
thread2 = myThread (2, «Поток-2», 2)

thread1.start ()
thread2.start ()

threads.append (поток1)
threads.append (поток2)

для t в потоках:
t.join ()
print «Выход из основного потока»


线程 优先 级 队列 (Queue)

Python 的 Queue 中 提供 了 同步 的 、 线程 安全 的 队列 类 , 包括 FIFO (先 入 先 出) Queue , LIFO (后 入 先 出) 队列 LifoQueue , 优先 级 PriorityQueue语 , 能够 在 多 线程 中 直接 使用。 可以 使用 队列 来 实现 线程 间 的 同步。

Очередь 模块 中 的 常用 方法:

  • Очередь.qsize () 返回 队列 的 大小
  • Queue.empty () 如果 队列 为 空 , 返回 Верно, 反之 Неверно
  • Queue.full () 如果 队列 满 了 , 返回 Верно, 反之 Ложно
  • Queue.full 与 maxsize 大小 对应
  • Queue.get ([блок [, тайм-аут]]) 获取 队列 , тайм-аут 等待 时间
  • Queue.get_nowait () 相当 Queue.get (Ложь)
  • Queue.put (элемент) 写入 队列 , тайм-аут 等待 时间
  • Queue.put_nowait (элемент) 相当 Queue.put (элемент, ложь)
  • Queue.task_done () 在 完成 一项 工作 之后 , Queue.task_done () 函数 向 任务 已经 完成 的 队列 发送 一个 信号
  • Очередь.join () 实际上 意味着 等到 队列 为 空 , 再 执行 别的 操作

实例 (Python 2.0+)

очередь импорта
импорт потоковой передачи
время импорта

exitFlag = 0

класс myThread (threading.Thread):
def __init __ (self, threadID, name, q):
threading.Thread .__ init __ (сам)
self.threadID = threadID
self.name = имя
self.q = q
def run (self):
напечатать «Запуск» + self.name
process_data (self.name, self.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

2021 © Все права защищены. Карта сайта