Toly blog

Python and Django notes

Многопоточность в одну строку

Перевод статьи Криса Кила Parallelism in one line

Python имеет ужасную репутацию, когда речь идет о возможности параллельных вычислений. Не обращая внимания на типичные рассуждения о его потоках и GIL (который обычно нормально работает), по-моему реальная проблема многопоточности Python не техническая, а педагогическая. Распространенные руководства о библиотеках threading и multiprocessing в целом неплохие, но тяжеловаты для понимания. Они начинаются с глубоких вещей, и заканчиваются до просто применяемых практик.

Традиционный пример

Беглое ознакомление с первыми результатами поискового запроса на тему “Python threading tutorial” показывает, что почти каждый из них основан на использовании какого-либо вспомогательного класса в связке с модулем Queue.

Типичный пример многопоточности вида поставщик-потребитель:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#Example.py
'''
Standard Producer/Consumer Threading Pattern
'''

import time
import threading
import Queue

class Consumer(threading.Thread):
	def __init__(self, queue):
		threading.Thread.__init__(self)
		self._queue = queue

	def run(self):
		while True:
			# queue.get() blocks the current thread until 
			# an item is retrieved. 
			msg = self._queue.get()
			# Checks if the current message is 
			# the "Poison Pill"
			if isinstance(msg, str) and msg == 'quit':
				# if so, exists the loop
				break
			# "Processes" (or in our case, prints) the queue item	
			print "I'm a thread, and I received %s!!" % msg
		# Always be friendly! 
		print 'Bye byes!'


def Producer():
	# Queue is used to share items between
	# the threads.
	queue = Queue.Queue()

	# Create an instance of the worker
	worker = Consumer(queue)
	# start calls the internal run() method to 
	# kick off the thread
	worker.start()

	# variable to keep track of when we started
	start_time = time.time()
	# While under 5 seconds.. 
	while time.time() - start_time < 5:
		# "Produce" a piece of work and stick it in 
		# the queue for the Consumer to process
		queue.put('something at %s' % time.time())
		# Sleep a bit just to avoid an absurd number of messages
		time.sleep(1)

	# This the "poison pill" method of killing a thread. 
	queue.put('quit')
	# wait for the thread to close down
	worker.join()


if __name__ == '__main__':
	Producer()

Мда… Просматриваются Java’вские корни.

Что ж, я не хочу, что бы у вас создалось впечатление, будто схема поставщик-потребитель плоха для многопоточной разработки - это определенно не так. На самом деле такой способ хорошо подходит для решения множества задач. Однако, я думаю, что это не подходит для ежедневного применения.

Проблемы (на мой взгляд)

Во-первых, вам нужен шаблонный класс, который делает то, что нужно. Во-вторых, вам нужно организовать очередь, согласно которой будут обрабатываться объекты; и наконец, вам нужны методы для входа в очередь и выхода из очереди что бы делать реальную работу (скорее всего, с участием другой очереди, если вы хотите получать обратную связь или сохранять результаты работы).

Больше воркеров, больше задач

Следующее, что вы вероятно сделаете, это пулл воркеров, что бы выжать из Python больше производительности. Ниже приводится измененный код примера из превосходного руководства по многопоточности от IBM. Это достаточно распространенный сценарий, когда вы распределяете задачи получения веб-страниц на несколько потоков.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#Example2.py
'''
A more realistic thread pool example 
'''

import time
import threading
import Queue
import urllib2

class Consumer(threading.Thread):
	def __init__(self, queue):
		threading.Thread.__init__(self)
		self._queue = queue

	def run(self):
		while True:
			content = self._queue.get()
			if isinstance(content, str) and content == 'quit':
				break
			response = urllib2.urlopen(content)
		print 'Bye byes!'


def Producer():
	urls = [
		'http://www.python.org', 'http://www.yahoo.com'
		'http://www.scala.org', 'http://www.google.com'
		# etc.. 
	]
	queue = Queue.Queue()
	worker_threads = build_worker_pool(queue, 4)
	start_time = time.time()

	# Add the urls to process
	for url in urls:
		queue.put(url)	
	# Add the poison pillv
	for worker in worker_threads:
		queue.put('quit')
	for worker in worker_threads:
		worker.join()

	print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
	workers = []
	for _ in range(size):
		worker = Consumer(queue)
		worker.start()
		workers.append(worker)
	return workers

if __name__ == '__main__':
	Producer()

Работает отлично, но посмотрите на весь этот код! Здесь методы инициализации, списки потоков для отслеживания работы, и что хуже всего, если вы склонны к обработке блокировок как и я, куча вызовов метода join. А впоследствии будет еще сложнее!

А что было сделано? Да практически ничего. Вышеприведенный код представляет собой хрупкую конструкцию. Это внимательное следование шаблону, это высокая вероятность ошибок (я даже забыл вызвать метод task_done() в объекте очереди пока писал это), и это писать много кода и получать мало функционала. К счастью, есть гораздо лучший способ.

Знакомьтесь: Map

Map - это класная маленькая функция, а главное, проста для распараллеливания вашего Python кода. Для тех, кто не вкурсе, map заимствована из функциональных языков, вроде Lisp’а. Это функция, которая применяет другую функцию к последовательности, например:

1
2
urls = ['http://www.yahoo.com', 'http://www.reddit.com']
results = map(urllib2.urlopen, urls)

Этот код применяет метод urlopen к каждому элементу переданной последовательности и сохраняет полученные результаты в список. Это более-менее эквивалентно следующему коду:

1
2
3
results = []
for url in urls:
    results.append(urllib2.urlopen(url))

Функция map управляет итерацией последовательности, применяет нужную функцию, и в конце сохраняет все получившиеся результаты в список.

Почему это имеет значение? Потому, что используя определенные библиотеки, map делает использование многопоточности тривиальным!

Функция map с поддержкой многопоточности присутствует в двух библиотеках: multiprocessing, а так же малоизвестная, но неменее замечательная - multiprocessing.dummy.

Отступление: Что это? Никогда не слышал о многопоточном клоне библиотеки multiprocessing под названием dummy? Я тоже не слышал до недавнего времени. Есть всего одно предложение на странице официальной документации библиотеки multiprocessing. И это предложение сводится к “Ах да, эта вещь существует”. Это печально, скажуя вам!

multiprocessing.dummy представляет собой точный аналог модуля multiprocessing. Разница лишь в том, что multiprocessing работает с процессами, а multiprocessing.dummy использует треды (со всеми присущими им ограничениями). Поэтому, все что относится к одной библиотеке, относится и к другой. Это делает переключение между ними довольно простым.

Приступим

Для доступа к map-параллельной функции, сперва нужно импортировать модули в которых она содержится и создать пулл:

1
2
3
4
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

pool = ThreadPool()

Последнее выражение делает то же, что и семистрочная функция build_worker_pool в приведенном ранее примере. А именно, создает кучу доступных воркеров, поготавливает их к выполнению задач, и сохраняет их в переменной, что бы к ним было легко обратиться.

Объекты из пула принимают несколько параметров, но сейчас упоминания стоит только один: processes. Этот параметр устанавливает количество воркеров в пуле. Если оставить это поле пустым, то по умолчанию оно будет равно количеству ядер в вашем процессоре.

В общем случае, если вы используете многопроцессовый пулл для ядро-раздельных задач, то больше ядер означает большуую скорость (я говорю это с многочисленными оговорками). Однако, когда речь идет о многопоточной обработке и делах связанных с сетью, это не так, и будет хорошей идеей поэксперементировать с размером пула.

1
pool = ThreadPool(4) # Sets the pool size to 4

Если вы запустите слишком много потоков, вы затратите больше времени на переключения между ними, чем на полезную работу, так что в этом случае неплох поизменять параметры до тех пор, пока не найдет оптимальный вариант для вашей задачи.

Итак, теперь, когда созданы воркеры и простой способ распараллеливания в наших руках, давайте перепишем загрузку веб-страниц из предыдущего примера.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
	'http://www.python.org',
	'http://www.python.org/about/',
	'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
	'http://www.python.org/doc/',
	'http://www.python.org/download/',
	'http://www.python.org/getit/',
	'http://www.python.org/community/',
	'https://wiki.python.org/moin/',
	'http://planet.python.org/',
	'https://wiki.python.org/moin/LocalUserGroups',
	'http://www.python.org/psf/',
	'http://docs.python.org/devguide/',
	'http://www.python.org/community/awards/'
	# etc.. 
	]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

#close the pool and wait for the work to finish 
pool.close()
pool.join()

Посмотрите на это! Код который на самом деле работает занимает 4 строки, 3 из которых формальны. Функция map сделала то же, что и предыдущий код в 40 строк с такой легкостью! Для проверки я испробовал оба подхода и попробовал различные размеры пула.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
results = []
for url in urls:
	result = urllib2.urlopen(url)
	results.append(result)

# ------- VERSUS ------- # 

# ------- 4 Pool ------- # 
pool = ThreadPool(4)
results = pool.map(urllib2.urlopen, urls)

# ------- 8 Pool ------- # 
pool = ThreadPool(8)
results = pool.map(urllib2.urlopen, urls)

# ------- 13 Pool ------- # 
pool = ThreadPool(13)
results = pool.map(urllib2.urlopen, urls)

Результаты:

1
2
3
4
 						Single thread:  14.4 Seconds
 						       4 Pool:   3.1 Seconds
 						       8 Pool:   1.4 Seconds
 						      13 Pool:   1.3 Seconds

Потрясающе! Это так же показывает, почему полезно поэкспериментировать с размером пула. Любой пулл с более чем 9 воркерами быстро приводит в падению прироста скорости (на этом компе).

Реальный пример №2

Создание миниатюр для тысяч изображений

Теперь давайте сделаем что-нибудь процесорно-раздельное! Довольно распространенная задача у меня на работе - это обработка больших коллекций картинок. Одна из таких задач - создание миниатюр. И это можно распараллелить.

Простая однопроцессная реализация

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
from PIL import Image

SIZE = (75, 75)
SAVE_DIRECTORY = 'thumbs'


def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)


def create_thumbnail(filename):
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)


if __name__ == '__main__':
    folder = os.path.abspath('images_path')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    for image in images:
        create_thumbnail(image)

Пример несколько адаптирован, но по сути происходит следующее: каталог с изображениями передается в программу, потом из каталога выбираются все картинки, и наконец создаются миниатюры и сохраняются в отдельный каталог.

На моем компьютере это выполняется за 27.9 секунд для порядка 6000 изображений.

Если мы заменим цикл for параллельной функцией map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import os
from PIL import Image
from multiprocessing import Pool

SIZE = (75, 75)
SAVE_DIRECTORY = 'thumbs'


def get_image_paths(folder):
    return (os.path.join(folder, f)
            for f in os.listdir(folder)
            if 'jpeg' in f)


def create_thumbnail(filename):
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename)
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)


if __name__ == '__main__':
    folder = os.path.abspath('images_path')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(create_thumbnail, images)
    pool.close()
    pool.join()

5.6 секунд!

Это серъезный прирост для изменения всего лишь нескольких строчек кода. Продакшен версия еще быстрее, так как в ней разделены процессорные задачи и задачи ввода-вывода на отдельные процессы и потоки - обычный рецепт для кода с учетом блокировок.

Так что, так. Распараллеливание в одну (почти) строку.

Comments