Пишем многопоточные приложения на python for fun and profit

от
Прочее

Давеча возникла необходимость использовать парсер веб страниц с поддержкой исполнения Java Script, Grab framework с задачей не справился (вообще то там есть такая фишка как транспорты с поддержкой того же селениума, однако какой-либо документации, увы, нет). Поэтому как инструмент для распознавания был выбран selenium framework + браузер PhantomJS. В отличии от grab у Selenium нет функционала для асинхронных (или хотя бы многопоточных) запросов, поэтому мы и будем реализовывать данный функционал вручную.
Писать мы будем чекер биткоинов принимающий как входной параметр текстовый файл с ключами и адресами в формате ключ адрес (через пробел). Для проверки будем использовать встроенный функционал DuckDuck.gо Все гуды(кошельки с балансом) будем писать в файл good.txt

  1. import threading
  2. import time
  3. from selenium import webdriver
  4. from Queue import Queue
  5. from sys import argv
  6. from time import sleep
  7.  
  8. THREADS_COUNT = 20  # Задаём количество потоков
  9. queue = Queue()  # Инициализация объекта типа Queue, служит для создания очереди тасков и доступа к ним.
  10. LOCK = threading.RLock()  # Замок, служит для блокирования доступа к блоку кода только для одного потока одновременно, от объекта Lock() отличается тем, что один и тот же поток может пользоваться  
  11. этим же кодом без блокирования.


Итак, на этом этапе особых вопросов возникнуть не должно, собственно использование Queue и Замка станет понятно чуть-чуть позднее.

  1. def parse():
  2.     global queue  # Для доступа к глобальной переменной, в принципе не обязательно, но более наглядно
  3.    out = []  # Список гудов
  4.    driver = webdriver.PhantomJS()  # Инициализируем webdriver для пользования браузером PhantomJS
  5.    # Важное замечание, PhantomJS должен быть либо в директории с .py файлом, либо в Path
  6.    while True:  # Основной цикл
  7.        try:  # Если очередь заданий пуста цикл перервётся
  8.            adress = queue.get_nowait()  # Получаем задание из очереди
  9.        except:
  10.             break
  11.         try:
  12.             driver.get('https://duckduckgo.com/?q=' + adress.split(' ')[1])
  13.         except:  # Коль что-то не заладилось вернём в очередь и попробуем снова позже
  14.            sleep(5)  # Поспим, авось всё наладится
  15.            queue.put(adress)
  16.             continue
  17.  
  18.         try:  # Если бабки есть, пишем адресс в гуды, иначе выводим что кошель не существует
  19.            print(driver.find_element_by_xpath('//td[@class="record__cell  record__cell--value"]').text)
  20.             if (driver.find_element_by_xpath('//td[@class="record__cell  record__cell--value"]').text) != '0 BTC':
  21.                 out.append(adress)
  22.         except:
  23.             print('Dont exist')
  24.     write(out)  # Пишем гуды в файл
  25.    driver.close()  # Закрываем браузер, он своё отработал, нефиг в памяти висеть
  26.  
  27. def write(string):
  28.     global LOCK
  29.     LOCK.acquire()  # Накидываем замок
  30.    for s in string:
  31.         result_file = open('good.txt','a')
  32.         result_file.write(s+'\n')
  33.         result_file.close()
  34.     LOCK.release()  # снимем его

Итак, Queue можно назвать своеобразным массивом-очередью с какими-либо данными, при помощи метода get_nowait() мы можем получить новое задание, а благодаря методу put() можем поместить наше задание в очередь.
Теперь поговорим про RLock. Точнее про то для чего он нужен. Итак, допустим такую гипотетическую ситуацию, два потока одновременно завершают исполнение и одновременно обращаются к функции write.
Что же произойдёт? Писать абсолютно одновременно, конечно, у них не получится, но данные это всё равно не спасёт и в итоге вместо кошельков мы получим кашу из символов, не очень приятно, да? Для избежания такой ситуации мы и используем RLock, он замыкает функцию на время исполнения одним потоком, так что другие потоки не могут к ней обратиться. То что нам и нужно.

  1. print "STARTED"
  2. global THREADS_COUNT
  3. input_file = open(argv[1])
  4. for line in input_file:
  5.     queue.put(line.replace('\n',''))  # Заполняем очередь
  6. for _ in xrange(THREADS_COUNT):  # Запускаем треды
  7.    thread_=threading.Thread(target=parse)
  8.     thread_.start()
  9. while threading.active_count() >1:  # Пока активны дочерние треды, не стоит закрывать основной
  10.    time.sleep(1)
  11. print "FINISHED"

Вот собственно и всё. Всем добра.
+2   3   1
2658