Uncategorized

all right my name is David golden I’m an employee at Heroku and we were talking to you today about the celery canvas module celery’s ships with a lot of tools to compose tasks together into workflows but as I’ve talked to people about their use of celery a lot of people are either unfamiliar with these tools or they’re afraid of them because they’re not quite sure how they work or they don’t really trust them so my goal today is to kind of pull back the card and show you what’s going on with these workflow tools and show you how you can even extend and create your own but before we get to that I want to talk about some design principles of celery tasks in general apart from the the canvas workflows in my opinion a good celery task is atomic in a concurrent system you don’t want to be leaking intermediate State out to other parts of the system so you should have clear success fail scenarios where any state that changes changes as immediately as possible and only when the task succeeds that you know keeps your tasks from interfering with other tasks or other parts of your system I think tasks should also be idempotent tasks will fail in creative ways whether you know you lose a process or an API goes out or something like that you want to be able to pick up where you left off and be able to execute your tasks over again so that you don’t have to have manual intervention when a task fails if your tasks are item code and you get this for free and for the purpose of our workflows tasks should be composable this is you know the same as in functional functional programming that you should think of your tasks like a function that have one concern take arguments for all the state they need or ways to access that state and return something that’s useful for other functions to consume so a lot of people focus on their choice for the celery broker but they overlook the result backend as an architectural choice the result backend is really important for these workflows so it’s worth taking some time to talk about what it does and which one you should choose first of all the result back-end is in charge of of tracking tasks state so well when a task is sent to a broker celery marks the task is pending when you know it sends started by a worker and mark started success or fail you need a result back-end in order to know what state a task is in and of course to know what it returned what you know Val your object the function returned that via task was running so which back-end should you choose I implore you if you take away nothing else from this talk please do not use RabbitMQ as your results door what’s actually going on here is that celery will use a message queue in rabbit because that’s rabbits only tools so everything looks like a queue to rabbit FF as the result or a state and when a client asks for that result or state rap will actually DQ the message from the queue the the first news asks where the task is asking will consume that and the next one who asks will get nothing there’s nothing to get so the next time you ask for the task state it says I don’t know it’s spending you can also use the database for your sole store by using Django celery and the the Django arm models are provided for results but pulling in a database can be kind of expensive and it doesn’t support all the operations that make our workflows work well as we’ll see you later also if you happen to be unfortunate enough to be using my sequel you could have problems from your transaction isolation level where you know one process sees one one answer and other sees another for like what state of my in memcache is a pretty good choice result back ends but it’s not persistent so if you care about your result sticking around you know when the process restarts then memcache is probably not your best choice for herzl back-end it does however have atomic counters which are nice it supports expiry for keys because you don’t want your results to be hanging around indefinitely forever yeah but that persistence problem causes you don’t want to choose Redis normally for my result back-end it has you know all the same properties as memcache with the exception of sharding but you know you can usually keep it pretty persistent you could lose a little bit of data but generally not much all right so now let’s get to the actual building blocks for canvas itself the most fundamental building block of the celery canvas tools is the signature it’s important to remember as we go forward

that every thing in the celery canvas module is based off of signature so what is a signature a signature is simply a serializable representation of a task it is a subclass of a dictionary and so it contains the import path for the task itself any arguments and keyword arguments that are need to be passed to that task and other options so you see we can JSON dumps a signature and then signature also takes and it’s in a dictionary so we could we could dump it load it back in pass it to the signature to rehydrate a signature and you can see how this would be useful when passing you know across the message bus consuming the other side this is basically how tasks serialization works in general so when you send a task over the wire it’s always creating a signature always passing that signature through the message broker and back to the worker to be consumed I’m sorry so o sub task type we’ll get to a bit later but for for normal tasks it’s none or null for the workflow types which aren’t actually a task but they’re a serialized representation of a workflow or a group of tasks the sub task type is a key that tells celery which type of registered signature to use in order to reconstruct this signature object so there’s a mapping in celery of the sub task type to the signature class and it NOLA just creates a normal signature class but if there’s something like chain or group in there then signature knows about those registered types and uses that subtask type to key off of which signature subclass to rehydrate they’re our convenience methods provided on signatures in order to treat them much like you would tasks those pass through normally to the task if it’s just a single task signature so you can you can run delay you can run apply async there are also convenience methods off of tasks themselves which you can use to get a signature I think an ancillary three one is still dot subtasks but then three to is signature or dot s for short works in both three one and three two you can pass those methods arguments and keyword arguments that are then stored into the signature and serialized you can even kind of build up arguments and and continue to add arguments keyword arguments with the exception of immutable signatures so signatures can be marked as immutable and then their arguments are frozen this is useful in situations where something else is adding trying to add task arguments for you as we’ll see later and if you want to prevent that from happening and just make your signature mutable so now that we are able to serialize tasks we can begin to compose them by constructing tasks that that interact with other tasks the most simple unit of composition is a callback which is a task that is launched based on another task finishing the result of the parent task is passed to the child as that tasks first argument so you can see here we have a couple of signatures for a task that just adds numbers together but the second one only has a single argument when we call this dot linked a method on the signature you can see then when we convert it to a dictionary the link option is set to the list of tasks that are linked to it and so celery when it finishes the first task will say okay are there any linked tasks for me to run and if so I’m going to run them and pass my results as an argument into that desk so here the result of the child task is six one plus two being three then out of the three even six but the results async result object we get back from actually applying async on the original signature is the parent tasks result so result dog yet will be three and then we have to navigate down to the children property to see all of the dependents or child tasks and navigate to it’s a result to see the result of the callback task we get one step farther down with chains so chains are multiple tasks run in series and they are actually changed together via this callback mechanism so think of a callback being you know one task launches another a chain is one task want is another launch is another for as many tasks as you specify so when we construct a chain we can give it any number of task signatures you can see that so when we cast it to a dictionary

of those then get put into a tasks keyword argument the result from the chain is actually the async result of the last task in the chain that’s an important distinction because then the way you treat that result is different than you would for a noble callback it makes sense for a chain because when you chain things together you’re saying basically this is a set of dependencies on the last task and so you really want to know what is my end result and so celery passes back to you they async result of the last task in the chain and if you were to inspect at state you would see pending pending pending while the other tasks than chain were executing but it would have populated the parent property so you could navigate using dot parent parent parent kind of back in the chain up to the very first task and kind of inspect the state of the chain as it’s being executed if you wanted to so what’s going on under the hood here these are not complete code slices just as much as I can fit on a slide is that when you run the chain is actually going through a process to prepare all the steps by navigating through each of the tasks and assigning the task to the link of the previous task so is taking the task before it and saying I want to link it to the current task and that creates this chain of callbacks remember dot link is the way we assign one task to be a callback of another and so then that preparer steps returns all the tasks and their async result instances and the run method just takes the first one and calls apply async on it it’s kind of like lining up all the Domino’s and then just tipping the first one over and all the other Domino’s you know continue to fall in series so next after chain we have Group group is entirely different it’s not built on callback or chain and its intended for a group of tasks that are meant to be run in parallel as opposed to in series now one important distinction in difference between group and chain is with chain you care about the last result in the series because you want the end result in group all of the tasks are treated as equals because they’re all run in parallel so what celery decided to do was to create a new async result subclass called group result whose dot results or dot children that’s they’re both the same in group result represent the async result instances for each child tasks the group result you notice has its own UUID that is distinct from any of the yo-yo IDs in the child tasks that is important because normally when you ask celery for the result or state of job it uses the task ID of the task UUID and it will track that in the result store but there is no state for a group result because this UUID doesn’t actually map to a task that’s being executed it’s an abstract concept for a group of tasks that will be executed so you can see here the serialized version looks very similar to a chain the only real distinct difference is this sub task type group so you know the signature is registered as a type group celery sees that group and rehydrates it as a as a group instance yeah so if we iterate through the child results we get you know the results of each task there is a bit of coordination with celery here because of this distinction between an async result and a group result so celery has a lot of branching around if this is a group do this thing if it’s just a regular async result do this other thing but you can see that apply async actually constructs a group result from the list of applied subtasks this apply tasks generator that just goes through and applies async on the individual task signatures and then yields each one’s async result so what you get is a group ID which is the UUID of the group result that i’ve mentioned and then all of the async result instances from the subtasks so now we’ve covered chains and groups what happens when you want to combine them the cord is the most complex type of all of the celery canvas workflows and basically what it is is it’s a group changed to a callback so you have a group of tasks that are executing in parallel and when all of the tasks are done really when all of them have succeeded then it launches this callback task the group it calls the header and the callback task it calls the body so

you can see the body and the header and the the dictionary version of the signature so the body task actually gets a list of all of the results from each of the header tasks and so this tasks dot T sum here would get a list as its argument passed to it you know by celery and then in this case it would just sum all the numbers together the result the async results returned from executing corn is similar to the chain it’s the result for the body task so then you know when you call dot result on it or dot state or whatever you’ll see this state or result from the body task and if you navigate back to its parent it’s it’s linked to the group result corresponding to the group of header tasks and so you have this kind of like asymmetric relationship here where the async results parent is the group but the group’s children are all the group’s subtasks you can’t actually get from the group the header group back to the the body result so it’s kind of an odd family tree but you can see that you can still inspect each task and results from the group get the individual results and then their sums together in this callback task so this requires a fair bit of coordination from the result back ends in order to work the cord knows how many tasks are in its header group and so it stores that as its cord size the the task tracer then anytime a member of a group which is part of a cord complete successfully will react in some way based on the back end of the results store that will determine whether or not the cords call back should be executed at any given point in time so as I said different back ends work different ways and this is where the atomic counter comes in handy because with memcache and Redis once the header group has been executed then anytime a member of that group succeeds it increments a counter based on that group UUID remember that group ID is distinct and then once that counter reaches the size of the cord then it knows that it’s okay to run the callback task it can gather all the results from all the header tasks in the cord or in the group and pass those to the body callback for all the other backends celery actually launches a separate task whose only job is to pull the group for the results of its children and if all the children are in a success state then it knows it can call the callback task if they are not then it launches a retry and it will retry with a certain countdown I can’t remember how long it is but it’ll it’ll wait and continue to pull and pull and pull until the group is finished so you can see how it would be advantageous if you want that callback to run quickly to use a result store that supports atomic counters so these are all of the primitives that celery provides by themselves for composing workflows but if you wanted to you could have built something like chain for instance yourself without any support from celery it’s just a subclass of signature celery’s support of the link function for linking callbacks gives you everything that you need to chain them all together so I have created something that I call a weave that is just an example of how you could extend the already existing primitives to create your own distributed workflows so a weave is just a custom signature subclass that takes a task and takes a numeric length of a list it accepts a large list as its argument as its first argument and it splits it into smaller lists according to the list size that you specify it passes each of those smaller lists into the task that you gave it and it takes the results of that task it calls a joint asked to join the subsequent smaller lists all together back into a single big list and returns that back to you so in this case range does exactly what you would expect gives you a list of numbers and multiplied takes each member of the list and multiplies it by 2 and returns a list with each member multiplied by 2 and so you can see the kind of trace in the path of what the weave does in order to retrieve a list at the end that is the same as the original list with each

member multiplied by 2 there’s a little more code than I could easily fit on the slide here but you can see I call this class decorator register type on the signature subclass and I provide a sub tab sub task type of weave and that tells celery to register this signature as type weave so any time it sees sub task type weave it’s going to construct a weave instance I put weave as a second argot the init here to that Arg represents the task that is associated with it but since there is no real task associated with all these signatures I could have put none there just as easily because celery is never going to try to launch an actual task for a signature sub class that doesn’t represent have a one-to-one representation of an actual task so in my apply async basically what I’m doing is I’m I’m taking an iterable from the args that I assume will be applied to the signature at some point later then it’s a knit because I only take the task and the list size in the init I create a cord out of this joint task but I use as the body and a group of header tasks that are cloned from my original tasks that I passed in you see down here at the end task clone and I provide it in art that’s a subset of the original list as I chunk the list or the iterable into the iteration size that I want and so I end up with a group of tasks that are all the same tasks with different sub lists passed to them and a join test that joins all the list together and I create a court out of it and I return that cord and this is my custom custom signature subclass so why would I do this well let me give you a practical example with a Twitter API integration the Twitter API allows you to ask for friends or follower IDs in groups of up to 5,000 IDs per API call and because we want to consume the API as efficiently as possible if we’re trying to get all of your friends or followers of course we would ask for the maximum number of IDs we can get but if we want any other information on those users that are we have to request the full user object and Twitter limits those calls to 100 IDs for call so in order to get a full list of five thousand five thousand user names we would have to make a single API call for all the IDS and then fifty API calls of one hundred a piece in order to get the user names out of the user objects you can see how this would be a natural fit for the weave concept that I’ve created so without any special coordination with just this weave signature subclass and those basic tasks I can now construct a workflow which results in a single list of all of my Twitter friends user names based on using chaining this friend ID call to a weave contain a username task with size 100 so I’m making calls of 100 IDs at a time to the users end point and getting the user objects extracting these or names and the joint tasks joins them all back together into a single list so this is just an example of the kinds of things that you can do with a celery canvas workflows by creating your own signatures of classes so it’s important to note what happens when one or more tasks fail and these workflows because well it may be kind of unintuitive and it may be catastrophic depending on how you architect your task so you need to know what happens when you run tasks in parallel it’s pretty benign you can get the state success or fail for each of the tasks individually you can inspect the results remember in a group all tasks are created equals so nothing is going to prevent anything else running you know you just see that some tasks failed and some succeeded if you’re running tasks in series then it’s kind of like you know flicking one Domino out of a series of dominoes right like it takes the one to fall over to hit the next one and if you’ve got a gap or one missing you know that’s an analogy to a task failure it’s not going to knock over that next Domino and all of the subsequent tasks are going to fail to to run this makes logical sense because things in the series are are they creative dependency chain alright and so if you have a prerequisite that it’s not fulfilled you can’t run run the thing that that it depends on it but it’s just something to be aware of you should be very defense in your coding about trying to retry tasks that have transient failures until they succeed and understand any chaining or

callbacks won’t run if the parent task fails so I’ve put together a few rules of thumb that aren’t directly related to workflows but are kind of some best practices throughout my ear is using celery the first one I feel like is pretty obvious never block on a task from a task or from any process really you’re the whole idea of celery is it is meant to be concurrent and if you have one process waiting on another process to finish and doing nothing then you are wasting your concurrency so always find ways to compose tasks together rather than blocking on one task and getting the result in a synchronous manner before moving on with with you know your other tasks with the other process that should be fairly obvious you want to try to design your tasks to do the smallest useful amount of work the reason I put useful in there is because there is a fair bit of overhead involved in serializing your tasks sending it over the network to your message broker sending it back over the network as a worker consumes it from the worker deserializing before you even begin to run your function if you do something like I heard one company once say that they were putting all of their database rights into celery tasks like one database right per task and they were seeing terrible performance of course because the overhead of just doing the database right in the process is less than actually doing all that work to make it a task but if you’re doing like an API integration or something like the Twitter example I showed it’s a really good rule of thumb to have say you know like one API call per task or something like that the smallest useful unit of work that you can create will give you the best return for your concurrency buck a lot of people don’t know about the soft and hard time limits and celery but they can be super useful in kind of having some guarantee about how your worker pool resources will be used if you have tasks that you know can in some circumstances take a really long time maybe because you know Facebook is slow right now or some some other resources is constrained then it can be really useful to interrupt those tasks and if you make them idempotent then there’s no downside of running them over again and if you’ve already made them small amounts of work and then you haven’t lost a whole lot and see all these design principles begin to work together to create a system where you are using your concurrency well where you can fail without worrying about it and then the time limits will give you good guidelines for good guarantees or how long your tasks will take at most before they just get axe to get killed it’s important to set both soft and hard because as I’ve shown here some tasks don’t respond to the exception that gets raised for the soft time out and they have to actually be killed like when a threat is waiting to join so axial 8 is another underused feature in celery what happens when a worker consumes a task from a broker is commonly that broker will set a reservation for how long it will hold the task before reissuing it into the queue and to another worker and the way that a worker says to the broker okay it’s it’s it’s alright for you to remove the task from the queue is by acknowledging or aking that it’s received that task celery workers by a default will act when they start working a task but you can conceive of workers dying mid process without being able to communicate back to the broker celery will try to revoke the task and try to send it back to the queue but that’s not always possible like if there’s a No work outage and/or power outage all kinds of things can happen and things will just die so when you’re acting early you can lose tasks from workers have partially worked the task and then gone away weren’t able to revoke it so it never went back to the queue and now your task drops on the floor if you axle ate the worker will not act the task until it reaches the terminal state either a success or failure and in that case you’re guaranteed that the task will be re execute ‘add if something happens to the worker the risk is that if that reservation time on the burger expires before the worker can act it then it will go back into the queue and be worked over again by another worker so you see how this could work in tandem with your soft and hard time outs so that you can guarantee your execution window for a task and if you are making tasks idempotent and and quick and all that good stuff then axle 8 will give you much better safeties against tasks being lost

midstream yeah that little note from the celery dock explains it pretty well there’s another one that I didn’t make a slide for but I want to cover in celery 3-1 at least the default serialization method used is pickle I don’t know if any of you guys saw Alex scanners talk at PyCon this past year but pickle can do very bad things to software and it’s not only a security concern but you know say for instance that you want to send a django model instance as a ciliary task which is already a bad idea to begin with if you’re using pickle and you know you you set your task to the worker it serializes everything as well as it can and let’s say it’s in flight and you do a database migration or a deploy or something that changes the model somehow right the model instance will get unpick old or reloaded with its old class definition and you now have celery Tufts that are running against some class definition that doesn’t match it and bad things will happen this is the problem with pickle and Jenner so the really easiest solution to this is to just use the JSON serializers not only will that avoid all of the security concerns with arbitrary code execution that pickle potentially guys but it will force you to use task arguments that are JSON serializable and I feel like that’s just good for everybody in fact so did ask because he made it the default and celery 3.2 and actually warns you gives you a big nasty warning if you use pickle in 3 3 2 or later yeah so I think that’s not all I had so now I’ll take any questions you guys have or I can keep talking about celery stuff that’s on the slides yeah what do you mean by rich signatures a class so what do you mean by the return value of a signature the under and the question was like how do you I assume how do you determine signature equality is that kinda what you’re saying okay the under-under equ it doesn’t like traverse the argos and kwargs to determine if the signature is effective the same like that’s playing at the same task so if you do you know signature like tasks add s1 to equal equal tasks on s1 to then it’ll give you false because it’s looking at the Python object IDs so unfortunately that’s not implemented though it could be pretty easily I feel like but one thing that you can do is if you are passing a signature around and you are afraid of two different processes trying to run that same signature what you can do is freeze the signature and the dot freeze method will go ahead and assign a UUID for the task ID for that specific signature and it will give you back an async results instance and so then anytime you get an async result back from that signature in the future it’ll use the frozen async results and so that can be useful to prevent you know the the or at least so that you can tell the same task was run twice because it’ll run with the same task ID things will get really screwy but at least you can tell yep yes my objective is to I’m adding multiple items to the task you but I want the ability to be able to reduce multiple items into a single worker process so tasks the question was can you reduce multiple tasks to be worked in the same process right tasks are meant to be independent units of work like not really related to other tasks unless you’re actually explicitly relating them together but what you can do is you can

determine either in real time as you’re launching the task or ahead of time in the task definition what queue that task is intended to go to and then you can have different worker pools working different queues you can have a worker a single worker pool working as many or as few queues as you want so if you want to make sure that the same celery process the same celery worker is going to run all of these tasks you can create a queue specific to that worker and send those tasks through that specific queue now it won’t like squash all of them down to to run only one function it’ll run still a task after task but at least you’ll be in the same process space does does that help at all okay one individual processes it but no I thank you no I mean yeah there’s it’s a good thought that’ll help me keep thinking about me um so I mean you could you could have I don’t know some sort of you know counter or some external state that is a flag for you know how many of these are are in flight right now or some sort of distributed lock you know like the there are different mechanisms you can use on top of celery outside of celery to help control the flow I don’t know your specific scenarios right something I’m gonna have to keep looking into yeah yes is anyone generating callbacks into a graphical console to like observe the progress of what I think was presented as a weave so in the in a graphical way the only thing that you have at your disposal is you celery does provide methods that will create a graph vis visualization out of a async results dependency tree so you know you have dot children and parent and that creates an inherent graph right and you can create a graph of is visualization it’s kind of expensive to do so I don’t think anybody’s doing it in real time celery does have a monitor tool monitoring tool called flower in which you can it’s a web interface to be able to track what tasks are in flight you know there’s a URL endpoint for each task UUID so you can see how that tasks kind of progressed when it started when it ended and I think you can link to parent and child tasks from within that so you can kind of you can use that web monitoring tool to kind of stitch together your own you know internal visualization of what’s going on but the graph of is output is the closest thing that’s so has to support of a graphical view of that thank you and the more questions and I I can keep talking about stuff if you want me to everyone code I have I go on all day so obviously when you’re working in a distributed system log collation becomes really important because you want a single place to look for where did this thing get run so just a a quick shout out for Heroku or log coalition’s awesome so you can you know see all of your dinos your worker dinos collating into a single log point and then drain that to wherever you want to and so then if you’re using a tool like flower and you want to know what happened to this task I want to see the log output then just take the UUID and you know using log leer Splunk or whatever you can actually search for that UUID and your collated logs and that’s really the only sane way to debug celery stuff because you know you never know where something is going to run you often can’t reproduce issues locally that you had in production and so I really recommend log coalition being able to search your logs and keeping track of your tasks you you IDs somewhere and the answer to almost all bugs and distributed systems is more logging it things can always almost always be solved with with the right logging were logging so I really highly recommend that additionally there there are debugging tools that work better work best locally there’s this PDB variant in celery

called our DB that will allow you to set a breakpoint inside a task and will actually open telnet on on a port a unique port per task so you could have you know ten tasks running in parallel in a groove set a breakpoint an RVT breakpoint of those tasks and it’ll launch you know ten tell nets on different ports you can actually tell that into your celery process and then poke around just like you would with PDB and that can be really useful for debugging yeah what else yes about heroku brokers oh just hearing what I want to hear yeah so oh you want to know well where were ya I said I didn’t talk much about brokers so the only thing I have to say about brokers is that if you’re using anything about RabbitMQ or Redis you’re asking for trouble because even though there is support for things like you know sqs and iron and Q and CouchDB and God you they’re not fully supported like there are features are going to be missing there are things you can’t do like this this flower monitoring tool only works with RabbitMQ and Redis brokers I I prefer RabbitMQ because I feel like it’s a tool made for the job but it’s also more configuration to set up and some people don’t like the configuration hassle if you don’t have like your file descriptors bumped up in your file system then you can crash Redis or rapid mq sorry if you have a ton of tasks or high queue depth there are different things that can go wrong it’s operationally more intensive to run RabbitMQ but I feel like it’s the best tool for the job Redis is good enough to the point where you know like I don’t know it’s not a huge deal if you want to run rightist at your broker but I wouldn’t run any of the other ones thanks cool well if you guys have any more questions I’ll be here in Ohio for another couple of hours at least before heading to the airport so just track me down and you know if you’re looking for a job as we all are I’m sure you know we’re hiring thank you David three presentation

You Want To Have Your Favorite Car?

We have a big list of modern & classic cars in both used and new categories.