Конкурентность в асинхронном приложении на примере twisted

6 Сен
2011

Теоретически, проблема конкурентного доступа не характерна для асинхронных приложений. В отличие от приложений с параллельной архитектурой, в которых в каждый момент времени может выполняться несколько задач претендующих на какой то общий ресурс — в асинхронном приложении в один момент времени выполняется только одна активность.
Но на практике все выглядит немного по иному:

Предстваим следующую ситуацияю. Наш сервис работает с удаленным сервисом (S), который обрабатывает запросы параллельно. У нас есть ресурс A, который зависит от данных на S. Запрос R1 обновляет A, для этого он запрашивает данные с S. В это время приходит R2, он модифицирует соответствующие данные на S. Запрос R2 пришел позже, но изза параллельной архитектуры S — обработался раньше (см. Рис.1).
image
Рис.1. Порядок выполнения запросов.
Теперь R1 получает данные уже с учетом модификаций R2, в то время как ресурс A еще не знает, что это результат операций R2. Вот вам и race condition.
Пример:

У Коли есть жена и счет в банке. Коля с женой решили, что у них в семье равноправие и общий бюджет, а потому у каждого из них есть по карте, привязанной к этому счету. Коля работает по фрилансу и приходит время, когда ему должны заплатить за последний проект.
Жена решает порадовать Колю, и подарить ему на ДР новый iPad2. Встав с утра по раньше, она находит на счету N$! Радостная она заказывает iPad2 мужу и новые часики себе. Когда Коля проверяет счет в банке, там «по прежнему» ноль. Он не догадывается проверить последние операции и долго долго ругается с заказчиком.
Коля — запрос 1, его жена — запрос 2, счет в банке — ресурс на удаленном сервисе, банковские карты — прокси для удаленного ресурса.
На месте Коли, жены и счета в банке могут быть: финансовый сервис, АСУТП и многое другое. Суть конкретно моей проблемы разгласить не могу, так как DNA.
Как бороться.

Бороться будем с применением twisted. Хотя общая концепция верна для любого асинхронного фреймворка.
Очевидно, что решением является блокировка ресурса A. Теоретически — для блокировки подойдут те же конструкции, что и для параллельной архитектуры: semaphor, mutex, conditional variable. Вопрос в реализации. Итак, рассмотрим mutex (почему mutex? потому что conditional variable тривиальна, а для semaphor мне пока не удалось найти применения).
Как сделать mutex?

Вариант нерабочий. В параллельной архитектуре — поток может просто спать, и нам ничего за это не будет. В асинхронной архитектуре — если мы просто уснем (например time.sleep(100) ) — то все встанет колом, и мы никогда ничего не дождемся, так как нам необходимо чтобы eventloop переключился на обработку запроса, заблокировавшего ресурс, а пока мы спим — этого не произойдет.
Вариант неправильный. Можно реализовать через reactor.callLater(1, self.some_method, *args), где self.some_method и есть наш метод, который ожидает окончания блокировки.
Недостатки следующие:
  • Довольно страшный код.
  • Ждем мы целую секунду, а за нее может многое случиться, например некий запрос Rn снова заблокирует A.

Вариант. И, наконец, верный вариант. У нас асинхронное приложение, построенное на Deferred’ах. Для того, чтобы что то в нем произошло — нужно чтобы сработал Deferred. Вывод — блокировку необходимо делать на Deferred’ах.
class Mutex(object):
  
    def __init__(self):
        self.locked = False
        self.waiters = list()

    def acquire(self):
        d = Deferred()
        if self.locked:
            self.waiters.append(d)
        else:
            self.locked = True
            d.callback(True)
        return d

    def release(self):
        self.locked = False
        if self.waiters:
            self.locked = True
            d = self.waiters.pop()
            d.callback(True)

Конструкция проста: если мы хотим получить доступ к ресурсу — мы получаем Deferred. Работать с ресурсом мы начинаем только после того, как Deferred срабатывает. Сам класс отслеживает отработку Deferred’ов, и, как только один из них отрабатывает, он достает из очереди следующий и запускает его.
Приведенный пример реализации не полон, реализацию мьютекса в twisted (DeferredLock) можно посмотреть здесь: http://twistedmatrix.com/trac/browser/tags/releases/twisted-11.0.0/twisted/internet/defer.py. Там же есть и DeferredSemaphore.
Пример использования.

Использовать можно по разному: можно сделать так, что объект сам следит за доступом к нему:
class ImportantObject(object):

    def __init__(self):
         self.lock = defer.DeferredLock()

    def get_lock(self):
         return self.lock.acquire()

    def release_lock(self):
         return self.lock.release()

Или, если объекты выбираются из БД и храняться в сессиях (то есть для каждого запроса — объект ImportantObject с id 1 будет разным), можно сделать пул блокировок:
class Pool(object):
    __metaclass__ = Singleton

    def __init__(self, objects_list):
        self.__objects = dict()
        for o in objects_list:
            self.__objects[o.id] = defer.DeferredLock()

    def acquire(self, o):
        if o.id not in self.__objects:
            self.__objects[o.id] = defer.DeferredLock()
        return self.__objects[o.id].acquire()

    def release(self, o):
        self.__objects[o.id].release()

Pool здесь слегка упрощен, мы «забыли» методы для обновления списка отслеживаемых объектов, чтобы не загромождать статью. Так же мы упустили реализацию Singleton, но найти /сделать ее — не составит труда даже начинающему питонисту.
Ну и наконец:
def multiplex(self, a):

    def get_value_from_remote_service(skipped, a):
        d = some_service.do_long_boring_call(a)
        return d

    def power(result, a):
        return result*a

    d = Pool().acquire(a)
    d.addCallback(get_value_from_remote_service, a)
    d.addCallback(power,a)
    return d

П.С.: К сожалению, документация twisted далеко не исчерпывающая. Более того, покрывает она хорошо если процентов 30 этого фреймворка. Поэтому, когда мной решалась проблема конкурентности — я в течении 3-х дней изобретал различные велосипеды. Пока не догадался посмотреть исходники twisted. Так что общий совет — работая с twisted — больше читайте исходники, там спрятаны велосипеды на все случаи жизни.
Источники:
Официальная документация twisted.
Исходники twisted
По материалам Хабрахабр.



загрузка...

Комментарии:

Наверх