Skip to main content

Using Elastic APM to visualize asyncio behavior

A few weeks ago, Chris Wellon­s’ blog “La­ten­cy in Asyn­chro­nous Python” was shared on our in­ter­nal #python chan­nel at Elas­tic, and as some­one look­ing in­to the per­for­mance char­ac­ter­is­tics of a re­cent asyn­cio-re­lat­ed change, I gave it a read and you should too. It was a time­ly post as it made me think ahead about a few things, and co­in­cid­ed nice­ly with our re­cent us­age of Elas­tic’s Ap­pli­ca­tion Per­for­mance Mon­i­tor­ing so­lu­tion.

It’s not what you know, it’s what you can prove.”

—De­tec­tive Alon­zo Har­ris in the movie Train­ing Day

Background

I work on the billing team for the Elas­tic Cloud, where we process us­age da­ta and charge cus­tomers ac­cord­ing­ly. One of the ways we charge is via in­te­gra­tions with the AWS and GCP mar­ket­places—with Azure on the way—­giv­ing cus­tomers the abil­i­ty to draw from com­mit­ted spend on those plat­forms and con­sol­i­date their in­fra­struc­ture spend­ing un­der one ac­coun­t. It’s a fea­ture a lot of cus­tomers wan­t, but it in­tro­duces some com­plex­i­ty com­pared to how we bill our di­rect pay­ment cus­tomer­s.

While most of our billing is done in ar­rears—at the end of the mon­th—­mar­ket­places re­quire that charges are re­port­ed to them as they oc­cur. As such, we have a sys­tem that re­ports mar­ket­place cus­tomer us­age ev­ery hour.

The ini­tial ver­sion of this ser­vice, while built us­ing Python’s async/await syn­tax, did­n’t lever­age a lot of the con­cur­ren­cy op­por­tu­ni­ties the asyn­cio li­brary pro­vides…”pre­ma­ture op­ti­miza­tion” and all0. It was de­signed to be able to take ad­van­tage of them, but worked ini­tial­ly as a gen­er­a­tor pipe­line. It was a green­field project start­ing with no cus­tomer­s, so build­ing some­thing that worked cor­rect­ly was more im­por­tant than its speed. It is billing soft­ware, af­ter al­l.

async def report_usage(bills):
    async for bill in bills:
        await submit_bill(bill)

async def main():
    users = get_users_to_bill()
    clusters = get_clusters_used(users)
    bills = generate_bills(clusters)
    await report_usage(bills)

This is ba­si­cal­ly what our pipe­line looked like, where its corou­tines all the way down. Be­cause none of those first corou­tines were await­ed they don’t ini­tial­ly do any­thing. That is, un­til we hit Line 9. Once that is await­ed, Line 2 is what starts pulling ev­ery­thing through, so this finds one us­er with us­age, gets their clus­ter de­tail­s, com­putes their bil­l, sub­mits it, then keeps it­er­at­ing through all of the users with new us­age. With a small user­base this runs “fast enough”, but we’re do­ing a lot of stuff se­quen­tial­ly that we don’t need to. Sub­mit­ting one user’s bill should­n’t block us from find­ing an­oth­er user’s clus­ters which should­n’t block us from gen­er­at­ing a third user’s bill­s, all of which in­volve plat­for­m-spe­cif­ic REST APIs, Post­gres queries, Elas­tic­search search­es, and more. It’s a very I/O heavy ap­pli­ca­tion.

The fol­low­ing sen­tence of Knuth’s “pre­ma­ture op­ti­miza­tion” quote0—”Yet we should not pass up our op­por­tu­ni­ties in that crit­i­cal 3%.”—re­cent­ly be­come more rel­e­van­t, or crit­i­cal. This sys­tem is not on­ly pro­cess­ing more mar­ket­place users than be­fore, it’s pro­cess­ing mul­ti­ple plat­forms of users in­stead of on­ly GCP. Maybe it’s time to dig deep­er in­to asyn­cio and op­ti­mize.

Let’s Get Concurrent

Once we have all of the user­s, a first cut would be to process them con­cur­rent­ly and get an im­me­di­ate boost. There’s a lot of users now so we need to re­strict how con­cur­rent we can be so we’re not over­load­ing any­thing, es­pe­cial­ly not the cloud plat­forms we’re re­port­ing to.

async def process_user(user, semaphore):
    async with semaphore:
        clusters = await get_clusters_used(user)
        bill = await generate_bill(clusters)
        await report_usage(bill)

async def main():
    tasks = []
    sem = asyncio.Semaphore(MAX_CONCURRENCY)

    async for user in get_users_to_bill():
        tasks.append(asyncio.create_task(process_user(user, sem)))

    await asyncio.gather(*tasks)

That works fairly well! It runs many times faster than our old pipeline if you look at wall clock time, but if we have 1,000 users and MAX_CONCURRENCY=50, this starts 1,000 tasks and we allow 50 of them to do work at a time. The final task to run might do 1 second of real work but it waited many seconds for the opportunity.


A quick aside: Chris’ post is mostly about how this situation affects background tasks when a big group of new tasks like our process_user tasks start up. That problem doesn’t really apply to this system as it’s a standalone application initiated by cron and processes users then exits. There aren’t any other background tasks to interrupt…yet.


Bringing APM Into It

We al­ready had logs and met­rics flow­ing in­to Elas­tic­search, but those on­ly tell part of the sto­ry. Elas­tic’s ob­serv­abil­i­ty so­lu­tions in­clude a pret­ty cool APM pro­duc­t, and we love to use our own prod­uct­s, so we start­ed us­ing the elas­tic-apm pack­age to send per­for­mance da­ta to APM Serv­er.

import elasticapm
client = elasticapm.Client()  # get config from env

async def process_user(user, semaphore, parent_tx):
    client.begin_transaction(
        "user", trace_parent=parent_tx.trace_parent
    )
    elasticapm.set_user_context(user_id=user.user_id)

    async with semaphore:
        ...
        client.end_transaction("user task")

This creates a "user task" transaction for each user that we’re going to process, and inside of it we have several spans for some internal functions that do some of the work, like gathering different types of usage via searches and reporting usage to platform-specific REST APIs.

If we look at what a run of the service shows in the APM timeline with MAX_CONCURRENCY=2 and then process three users, we see the following:

/images/apm/before.thumbnail.png

Here we have three "user task" transactions stacked on top of each other, all started at the same time, but with only the top two making progress. The third task begins doing actual work after one of those first two ended.

Giv­en the way the code was writ­ten we ex­pect that, and with­out APM we might even think this is fine. Frankly, giv­en how the sys­tem needs to work to­day, it is sort of fine. That is un­til we need to look at how long us­er tasks are run­ning for and re­al­ize our met­rics and un­der­stand­ing are way of­f.

If we’re run­ning for three users or three thou­sand user­s, each in­di­vid­u­al us­er should take around the same amount of time to run, but the way we wrote this just about ev­ery met­ric will be use­less. Even in our tiny ex­am­ple above, the third task clear­ly com­plet­ed its ac­tu­al work the fastest but it gets pe­nal­ized by hav­ing done noth­ing for for about two-thirds of its life­time.

Refactoring

Chris’ post mentions using an asyncio.Queue for the concurrent tasks we want to run. Before I mentioned that our service doesn’t need to worry about anything but those "user task" instances, so if we could live with useless metrics we’d be ok [narrator’s voice: they can’t], but we do have plans that will need to solve what Chris is solving.

Af­ter im­ple­ment­ing rough­ly what he did, we end up with a much more use­ful time­line1.

/images/apm/after.thumbnail.png

We see the same three stacked trans­ac­tions with the first two be­gin­ning right away, and now we see the third does­n’t be­gin un­til one of the first two end­s. Per­fec­t! Now the third or three thou­sandth task will take on­ly as long as it ac­tu­al­ly takes that task to run.

Not on­ly are our met­rics more use­ful in the short ter­m, we’re now ready for those changes I men­tioned where we need to give back­ground tasks a fair chance. If we’re 200 re­quests in­to those 3,000 and a new task needs to run, it gets a chance as the 201st in­stead of as the 3,001st.

Us­ing APM lets us vi­su­al­ize im­pact our changes have and gives us the abil­i­ty to track that all over time. It’s a pret­ty great tool and we’re ex­cit­ed to use it even more.

Conclusion

  1. asyn­cio is pret­ty pow­er­ful, but it can be a bit dif­fi­cult to un­der­stand and it’s not the right so­lu­tion for ev­ery prob­lem. If you’re in­ter­est­ed in learn­ing more about it, Łukasz Lan­ga, a core CPython de­vel­op­er who works on EdgeDB—a data­base built on top of Post­gres via the ex­cel­lent asyncpg li­brary (which we use a lot!)—has cre­at­ed a video se­ries all about how asyn­cio does what it does. I high­ly rec­om­mend it.

  2. AP­M—E­las­tic’s or oth­­er­­wise—is a re­al­­ly val­u­able tool. I thought I knew how some of our stuff was work­ing, but prov­ing it via APM has helped in­­­form a bunch of de­­ci­­sions al­ready in the short time this ser­vice has been us­ing it.

0(1,2)

We should for­get about small ef­fi­cien­cies, say about 97% of the time: pre­ma­ture op­ti­miza­tion is the root of all evil.” — Don­ald Knuth, “Struc­tured Pro­gram­ming With Go To State­ments”

1

Don’t pay at­ten­tion to the ex­act tim­ing dif­fer­ences be­tween the two im­ple­men­ta­tion­s. These time­lines are gen­er­at­ed from an in­te­gra­tion test on my lap­top that is vary­ing lev­els of over­heat­ing.