Redis
 sql >> Baza danych >  >> NoSQL >> Redis

Flask by Example — implementacja kolejki zadań Redis

Ta część samouczka opisuje szczegółowo, jak zaimplementować kolejkę zadań Redis do obsługi przetwarzania tekstu.

Aktualizacje:

  • 12.02.2020:Aktualizacja do wersji Pythona 3.8.1 oraz najnowszych wersji Redis, Python Redis i RQ. Zobacz poniżej szczegóły. Wspomnij o błędzie w najnowszej wersji RQ i podaj rozwiązanie. Rozwiązano problem z http przed błędem https.
  • 22.03.2016:Aktualizacja do wersji Pythona 3.5.1 oraz najnowszych wersji Redis, Python Redis i RQ. Zobacz poniżej szczegóły.
  • 22.02.2015:Dodano obsługę Pythona 3.

Bezpłatny bonus: Kliknij tutaj, aby uzyskać dostęp do bezpłatnego samouczka wideo Flask + Python, który pokazuje, jak krok po kroku zbudować aplikację internetową Flask.

Pamiętaj:oto, co tworzymy — aplikacja Flask, która oblicza pary słowo-częstotliwość na podstawie tekstu z danego adresu URL.

  1. Część pierwsza:skonfiguruj lokalne środowisko programistyczne, a następnie wdroż zarówno środowisko pomostowe, jak i produkcyjne w Heroku.
  2. Część druga:skonfiguruj bazę danych PostgreSQL wraz z SQLAlchemy i Alembic do obsługi migracji.
  3. Część trzecia:dodaj logikę zaplecza, aby zeskrobać, a następnie przetworzyć liczbę słów ze strony internetowej za pomocą bibliotek żądań, BeautifulSoup i Natural Language Toolkit (NLTK).
  4. Część czwarta:Zaimplementuj kolejkę zadań Redis do obsługi przetwarzania tekstu. (aktualny )
  5. Część piąta:skonfiguruj Angular na froncie, aby stale odpytywać zaplecze, aby sprawdzić, czy żądanie zostało przetworzone.
  6. Część szósta:Prześlij na serwer pomostowy w Heroku – konfiguracja Redis i szczegółowe omówienie sposobu uruchamiania dwóch procesów (sieciowego i roboczego) na jednym Dyno.
  7. Część siódma:zaktualizuj interfejs, aby był bardziej przyjazny dla użytkownika.
  8. Część ósma:Utwórz niestandardową dyrektywę kątową, aby wyświetlić wykres rozkładu częstotliwości za pomocą JavaScript i D3.

Potrzebujesz kodu? Pobierz go z repozytorium.


Wymagania dotyczące instalacji

Użyte narzędzia:

  • Ponowne (5.0.7)
  • Python Redis (3.4.1)
  • RQ (1.2.2) - prosta biblioteka do tworzenia kolejki zadań

Zacznij od pobrania i zainstalowania Redisa z oficjalnej strony lub przez Homebrew (brew install redis ). Po zainstalowaniu uruchom serwer Redis:

$ redis-server

Następnie zainstaluj Python Redis i RQ w nowym oknie terminala:

$ cd flask-by-example
$ python -m pip install redis==3.4.1 rq==1.2.2
$ python -m pip freeze > requirements.txt


Skonfiguruj pracownika

Zacznijmy od utworzenia procesu roboczego do nasłuchiwania zadań w kolejce. Utwórz nowy plik worker.py i dodaj ten kod:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

Tutaj nasłuchiwaliśmy kolejki o nazwie default i nawiązałem połączenie z serwerem Redis na localhost:6379 .

Uruchom to w innym oknie terminala:

$ cd flask-by-example
$ python worker.py
17:01:29 RQ worker started, version 0.5.6
17:01:29
17:01:29 *** Listening on default...

Teraz musimy zaktualizować nasz app.py wysyłać zadania do kolejki…



Zaktualizuj app.py

Dodaj następujące importy do app.py :

from rq import Queue
from rq.job import Job
from worker import conn

Następnie zaktualizuj sekcję konfiguracji:

app = Flask(__name__)
app.config.from_object(os.environ['APP_SETTINGS'])
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

q = Queue(connection=conn)

from models import *

q = Queue(connection=conn) skonfigurować połączenie Redis i zainicjować kolejkę na podstawie tego połączenia.

Przenieś funkcję przetwarzania tekstu z naszej trasy indeksu do nowej funkcji o nazwie count_and_save_words() . Ta funkcja akceptuje jeden argument, adres URL, który przekażemy do niej, gdy wywołamy ją z naszej trasy indeksu.

def count_and_save_words(url):

    errors = []

    try:
        r = requests.get(url)
    except:
        errors.append(
            "Unable to get URL. Please make sure it's valid and try again."
        )
        return {"error": errors}

    # text processing
    raw = BeautifulSoup(r.text).get_text()
    nltk.data.path.append('./nltk_data/')  # set the path
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)

    # remove punctuation, count raw words
    nonPunct = re.compile('.*[A-Za-z].*')
    raw_words = [w for w in text if nonPunct.match(w)]
    raw_word_count = Counter(raw_words)

    # stop words
    no_stop_words = [w for w in raw_words if w.lower() not in stops]
    no_stop_words_count = Counter(no_stop_words)

    # save the results
    try:
        result = Result(
            url=url,
            result_all=raw_word_count,
            result_no_stop_words=no_stop_words_count
        )
        db.session.add(result)
        db.session.commit()
        return result.id
    except:
        errors.append("Unable to add item to database.")
        return {"error": errors}


@app.route('/', methods=['GET', 'POST'])
def index():
    results = {}
    if request.method == "POST":
        # this import solves a rq bug which currently exists
        from app import count_and_save_words

        # get url that the person has entered
        url = request.form['url']
        if not url[:8].startswith(('https://', 'http://')):
            url = 'http://' + url
        job = q.enqueue_call(
            func=count_and_save_words, args=(url,), result_ttl=5000
        )
        print(job.get_id())

    return render_template('index.html', results=results)

Zwróć uwagę na następujący kod:

job = q.enqueue_call(
    func=count_and_save_words, args=(url,), result_ttl=5000
)
print(job.get_id())

Uwaga: Musimy zaimportować count_and_save_words funkcja w naszej funkcji index ponieważ pakiet RQ ma obecnie błąd, w którym nie znajduje funkcji w tym samym module.

Tutaj użyliśmy kolejki, którą zainicjowaliśmy wcześniej i nazwaliśmy enqueue_call() funkcjonować. To dodało nowe zadanie do kolejki i to zadanie uruchomiło count_and_save_words() funkcja z adresem URL jako argumentem. result_ttl=5000 argument line mówi RQ, jak długo trzymać wynik zadania przez - w tym przypadku 5000 sekund. Następnie wyprowadziliśmy identyfikator zadania do terminala. Ten identyfikator jest potrzebny do sprawdzenia, czy zadanie zostało wykonane.

Ustawmy w tym celu nową trasę…



Uzyskaj wyniki

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        return str(job.result), 200
    else:
        return "Nay!", 202

Przetestujmy to.

Uruchom serwer, przejdź do http://localhost:5000/, użyj adresu URL https://realpython.com i pobierz identyfikator zadania z terminala. Następnie użyj tego identyfikatora w punkcie końcowym „/results/”, tj. http://localhost:5000/results/ef600206-3503-4b87-a436-ddd9438f2197.

Dopóki nie upłynęło mniej niż 5000 sekund, zanim sprawdzisz status, powinieneś zobaczyć numer identyfikacyjny, który jest generowany, gdy dodajemy wyniki do bazy danych:

# save the results
try:
    from models import Result
    result = Result(
        url=url,
        result_all=raw_word_count,
        result_no_stop_words=no_stop_words_count
    )
    db.session.add(result)
    db.session.commit()
    return result.id

Teraz zmodyfikujmy nieco trasę, aby zwrócić rzeczywiste wyniki z bazy danych w formacie JSON:

@app.route("/results/<job_key>", methods=['GET'])
def get_results(job_key):

    job = Job.fetch(job_key, connection=conn)

    if job.is_finished:
        result = Result.query.filter_by(id=job.result).first()
        results = sorted(
            result.result_no_stop_words.items(),
            key=operator.itemgetter(1),
            reverse=True
        )[:10]
        return jsonify(results)
    else:
        return "Nay!", 202

Pamiętaj, aby dodać import:

from flask import jsonify

Przetestuj to ponownie. Jeśli wszystko poszło dobrze, powinieneś zobaczyć w przeglądarce coś podobnego do:

[
  [
    "Python", 
    315
  ], 
  [
    "intermediate", 
    167
  ], 
  [
    "python", 
    161
  ], 
  [
    "basics", 
    118
  ], 
  [
    "web-dev", 
    108
  ], 
  [
    "data-science", 
    51
  ], 
  [
    "best-practices", 
    49
  ], 
  [
    "advanced", 
    45
  ], 
  [
    "django", 
    43
  ], 
  [
    "flask", 
    41
  ]
]


Co dalej?

Bezpłatny bonus: Kliknij tutaj, aby uzyskać dostęp do bezpłatnego samouczka wideo Flask + Python, który pokazuje, jak krok po kroku zbudować aplikację internetową Flask.

W części 5 połączymy klienta i serwer, dodając Angular do miksu, aby utworzyć sondę, która będzie wysyłać żądanie co pięć sekund do /results/<job_key> punkt końcowy z prośbą o aktualizacje. Gdy dane będą dostępne, dodamy je do DOM.

Pozdrawiam!

To dzieło współpracy Cam Linke, współzałożyciela Startup Edmonton, i ludzi z Real Pythona



  1. Redis
  2.   
  3. MongoDB
  4.   
  5. Memcached
  6.   
  7. HBase
  8.   
  9. CouchDB
  1. Jak wykonać Persistence Store w Redis?

  2. Ściągawka z optymalizacją pamięci Redis

  3. jak zdobyć wszystkie klucze i wartości w redis w javascript?

  4. Jak skonfigurować JedisConnectionFactory do korzystania z SSL, aby nie otrzymywał błędu:JedisDataException:ERR nieszyfrowane połączenie jest zabronione?

  5. Czy redis na Heroku jest możliwy bez dodatku?