Improve concurrency

- Update Python to use async/await
- Use asyncpg
- Use connection pool
eyusupov/concurrency
Eldar Yusupov 5 years ago
parent 66d17625d8
commit a92bf6288c

@ -1,4 +1,4 @@
FROM python:3.4 FROM python:3.7
RUN mkdir -p /usr/src/app RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app WORKDIR /usr/src/app

@ -1,6 +1,6 @@
tornado==4.4.2 asyncpg==0.19.0
sqlalchemy==1.1.5 Click==7.0
mercantile==0.9.0 mercantile==1.1.2
pyproj==1.9.5.1 pyproj==2.4.0
pyyaml==3.12 PyYAML==5.1.2
psycopg2==2.6.2 tornado==6.0.3

@ -1,47 +1,38 @@
import tornado.ioloop import asyncpg
import tornado.web
import io import io
import os import os
from sqlalchemy import Column, ForeignKey, Integer, String import logging
from sqlalchemy.ext.declarative import declarative_base import tornado
from sqlalchemy.orm import relationship import tornado.web
from sqlalchemy import create_engine from tornado.log import enable_pretty_logging
from sqlalchemy import inspect
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
import mercantile import mercantile
import pyproj import pyproj
import yaml import yaml
import sys import sys
import itertools import itertools
log = logging.getLogger('tornado.application')
def GetTM2Source(file): def GetTM2Source(file):
with open(file,'r') as stream: with open(file,'r') as stream:
tm2source = yaml.load(stream) tm2source = yaml.load(stream)
return tm2source return tm2source
def GeneratePrepared(layers): def GenerateFunction(layers):
queries = [] queries = []
prepared = "PREPARE gettile(geometry, numeric, numeric, numeric) AS " function = "CREATE OR REPLACE FUNCTION gettile(geometry, numeric, numeric, numeric) RETURNS SETOF bytea AS $$"
for layer in layers['Layer']: for layer in layers['Layer']:
layer_query = layer['Datasource']['table'].lstrip().rstrip() # Remove lead and trailing whitespace layer_query = layer['Datasource']['table'].strip()
layer_query = layer_query[1:len(layer_query)-6] # Remove enough characters to remove first and last () and "AS t" layer_query = layer_query[1:len(layer_query)-6] # Remove enough characters to remove first and last () and "AS t"
layer_query = layer_query.replace("geometry", "ST_AsMVTGeom(geometry,!bbox!,4096,0,true) AS mvtgeometry") layer_query = layer_query.replace("geometry", "ST_AsMVTGeom(geometry,!bbox!,4096,0,true) AS mvtgeometry")
base_query = "SELECT ST_ASMVT('"+layer['id']+"', 4096, 'mvtgeometry', tile) FROM ("+layer_query+" WHERE ST_AsMVTGeom(geometry, !bbox!,4096,0,true) IS NOT NULL) AS tile" base_query = "SELECT ST_ASMVT('"+layer['id']+"', 4096, 'mvtgeometry', tile) FROM ("+layer_query+" WHERE ST_AsMVTGeom(geometry, !bbox!,4096,0,true) IS NOT NULL) AS tile"
queries.append(base_query.replace("!bbox!","$1").replace("!scale_denominator!","$2").replace("!pixel_width!","$3").replace("!pixel_height!","$4")) queries.append(base_query.replace("!bbox!","$1").replace("!scale_denominator!","$2").replace("!pixel_width!","$3").replace("!pixel_height!","$4"))
prepared = prepared + " UNION ALL ".join(queries) + ";" function = function + " UNION ALL ".join(queries) + ";$$ LANGUAGE SQL"
print(prepared) print(function)
return(prepared) return(function)
layers = GetTM2Source("/mapping/data.yml") dsn = 'postgresql://'+os.getenv('POSTGRES_USER','openmaptiles')+':'+os.getenv('POSTGRES_PASSWORD','openmaptiles')+'@'+os.getenv('POSTGRES_HOST','postgres')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','openmaptiles')
prepared = GeneratePrepared(layers)
engine = create_engine('postgresql://'+os.getenv('POSTGRES_USER','openmaptiles')+':'+os.getenv('POSTGRES_PASSWORD','openmaptiles')+'@'+os.getenv('POSTGRES_HOST','postgres')+':'+os.getenv('POSTGRES_PORT','5432')+'/'+os.getenv('POSTGRES_DB','openmaptiles'))
inspector = inspect(engine)
DBSession = sessionmaker(bind=engine)
session = DBSession()
session.execute(prepared)
def bounds(zoom,x,y): def bounds(zoom,x,y):
inProj = pyproj.Proj(init='epsg:4326') inProj = pyproj.Proj(init='epsg:4326')
@ -62,7 +53,7 @@ def zoom_to_scale_denom(zoom): # For !scale_denominator!
def replace_tokens(query,s,w,n,e,scale_denom): def replace_tokens(query,s,w,n,e,scale_denom):
return query.replace("!bbox!","ST_MakeBox2D(ST_Point("+w+", "+s+"), ST_Point("+e+", "+n+"))").replace("!scale_denominator!",scale_denom).replace("!pixel_width!","256").replace("!pixel_height!","256") return query.replace("!bbox!","ST_MakeBox2D(ST_Point("+w+", "+s+"), ST_Point("+e+", "+n+"))").replace("!scale_denominator!",scale_denom).replace("!pixel_width!","256").replace("!pixel_height!","256")
def get_mvt(zoom,x,y): async def get_mvt(connection,zoom,x,y):
try: # Sanitize the inputs try: # Sanitize the inputs
sani_zoom,sani_x,sani_y = float(zoom),float(x),float(y) sani_zoom,sani_x,sani_y = float(zoom),float(x),float(y)
del zoom,x,y del zoom,x,y
@ -73,30 +64,45 @@ def get_mvt(zoom,x,y):
scale_denom = zoom_to_scale_denom(sani_zoom) scale_denom = zoom_to_scale_denom(sani_zoom)
tilebounds = bounds(sani_zoom,sani_x,sani_y) tilebounds = bounds(sani_zoom,sani_x,sani_y)
s,w,n,e = str(tilebounds['s']),str(tilebounds['w']),str(tilebounds['n']),str(tilebounds['e']) s,w,n,e = str(tilebounds['s']),str(tilebounds['w']),str(tilebounds['n']),str(tilebounds['e'])
final_query = "EXECUTE gettile(!bbox!, !scale_denominator!, !pixel_width!, !pixel_height!);" final_query = "SELECT gettile(!bbox!, !scale_denominator!, !pixel_width!, !pixel_height!);"
sent_query = replace_tokens(final_query,s,w,n,e,scale_denom) sent_query = replace_tokens(final_query,s,w,n,e,scale_denom)
response = list(session.execute(sent_query)) log.info(sent_query)
print(sent_query) response = await connection.fetch(sent_query)
layers = filter(None,list(itertools.chain.from_iterable(response))) layers = filter(None,list(itertools.chain.from_iterable(response)))
final_tile = b'' final_tile = b''
for layer in layers: for layer in layers:
final_tile = final_tile + io.BytesIO(layer).getvalue() final_tile = final_tile + io.BytesIO(layer).getvalue()
return final_tile return final_tile
class GetTile(tornado.web.RequestHandler): class GetTile(tornado.web.RequestHandler):
def get(self, zoom,x,y): def initialize(self, pool):
self.pool = pool
async def get(self, zoom,x,y):
self.set_header("Content-Type", "application/x-protobuf") self.set_header("Content-Type", "application/x-protobuf")
self.set_header("Content-Disposition", "attachment") self.set_header("Content-Disposition", "attachment")
self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Origin", "*")
response = get_mvt(zoom,x,y) async with self.pool.acquire() as connection:
self.write(response) response = await get_mvt(connection, zoom,x,y)
self.write(response)
async def get_pool():
pool = await asyncpg.create_pool(dsn = dsn)
layers = GetTM2Source(os.getenv("MAPPING_FILE", "/mapping/data.yml"))
# Make this prepared statement from the tm2source
create_function = GenerateFunction(layers)
async with pool.acquire() as connection:
await connection.execute(create_function)
return pool
def m(): def m():
if __name__ == "__main__": enable_pretty_logging()
# Make this prepared statement from the tm2source io_loop = tornado.ioloop.IOLoop.current()
application = tornado.web.Application([(r"/tiles/([0-9]+)/([0-9]+)/([0-9]+).pbf", GetTile)]) pool = io_loop.run_sync(get_pool)
application = tornado.web.Application([(r"/tiles/([0-9]+)/([0-9]+)/([0-9]+).pbf", GetTile, dict(pool=pool))])
print("Postserve started..") print("Postserve started..")
application.listen(8080) application.listen(int(os.getenv("LISTEN_PORT", "8080")))
tornado.ioloop.IOLoop.instance().start() io_loop.start()
m() if __name__ == "__main__":
m()

Loading…
Cancel
Save