For the last few years I’ve been taking photos of my daily life and posting them on instagram. And for the last year and a halfish I’ve been using https://github.com/tryghost/ghost as a backend for my blog.

One of the major things that I thought was missing and really wanted to setup was cross posting from instagram on my personal blog. This includes grabbing the source images via the instagram API, storing them in S3, and creating a post.

I had written a script that would grab images and upload them to s3, and I was manually posting things every month or two. But with some down time over the recent holiday I made a poorly written end-to-end sync tool that backs up the instagram images and posts them to the blog. The script is below:

#!/usr/bin/env python

import json
import os
import requests

from boto.s3.connection import S3Connection
from boto.s3.key import Key

import urllib2
import StringIO
import datetime

from slugify import slugify

global CONN

auth_token = 'instagram-api-token'

try:
    AWS_ACCESS_KEY = os.environ['AWS_ACCESS_KEY']
    AWS_SECRET_KEY = os.environ['AWS_SECRET_KEY']
    AWS_BUCKET_NAME = os.environ.get('instascraper_bucket', 'bucketname')
    CONN = S3Connection(AWS_ACCESS_KEY, AWS_SECRET_KEY)
except Exception, e:
    print 'AWS Credentials were not properly set'


def _key_name(id):
    return 'instagram-photos/%s.png' % id


def get_ghost_token():
    res = requests.post('http://jake.ai/ghost/api/v0.1/authentication/token', data={
        'username': '[email protected]',
        'password': 'password-goes-here',
        'grant_type': 'password',
        'client_id': 'ghost-admin',
        'client_secret': 'q0f8hqf0hq'
    })

    return json.loads(res.content)['access_token']


def create_post(title, created_time, html):
    token = get_ghost_token()

    try:
        slug = slugify(title)
    except Exception:
        slug = '(untitled)'

    pd = dict(author="1",
              featured=False,
              image=None,
              language="en_US",
              markdown=html,
              meta_description=None,
              meta_title=title,
              page=False,
              published_by=None,
              slug=slug,
              status="published",
              tags=[{
                  "id": 7,
                  "uuid": "041d5867-9bcf-4f9e-a5a5-51cf7ab541d0",
                  "name": "insta",
                  "slug": "insta",
              }],
              title=title,
              published_at=created_time)

    h = {'Authorization': 'Bearer %s' % token, 'Content-Type': 'application/json'}
    res = requests.post('http://jake.ai/ghost/api/v0.1/posts',
                        json=dict(posts=[pd]), headers=h)

class InstagramPhoto(object):
    def __init__(self, image_dict):
        super(InstagramPhoto, self).__init__()
        self.id = image_dict.get('id')
        self.caption = None
        self.created_time = None
        if image_dict.get('caption'):
            self.caption = image_dict['caption'].get('text')
        self.instagram_image_url = image_dict['images']['standard_resolution']['url']
        self.instagram_url = image_dict.get('link')
        self.created_time = datetime.datetime.fromtimestamp(float(image_dict.get('created_time'))).strftime('%Y-%m-%d %H:%M:%S')

        self.s3_url = None

    def __repr__(self):
        return "InstagramPhoto(id=%s)" % (self.id)

    def upload_to_s3(self):
        bucket = CONN.get_bucket(AWS_BUCKET_NAME)
        if bucket.get_key(_key_name(self.id)):
            print 'This image already exists in s3: %s' % self.id
            k = Key(bucket)
            k.key = _key_name(self.id)
            self.s3_url = k.generate_url(expires_in=0, query_auth=False)
            return False
        try:
            k = Key(bucket)
            k.key = _key_name(self.id)

            file_handle = urllib2.urlopen(self.instagram_image_url)
            file_content = StringIO.StringIO(file_handle.read())
            k.set_contents_from_file(file_content)
            k.set_acl('public-read')
            self.s3_url = k.generate_url(expires_in=0, query_auth=False)
            return True
        except Exception, e:
            print 'An error occured trying to upload %s: %s' % (self.id, e)

    def post_to_blog(self):
        raw_body = '''<a href="%(instagram_url)s"><img src="%(image_url)s" class="instagram" alt="%(caption)s"></a>'''
        body = raw_body % {
            'instagram_url': self.instagram_url,
            'image_url': self.s3_url,
            'caption': self.caption
        }
        post = {'title': self.caption, 'html': body, 'status': 'draft',
                'created_time': self.created_time}

        create_post(post.get('title'), post.get('created_time'),
                    post.get('html'))


raw_images = requests.get('https://api.instagram.com/v1/users/self/media/recent', params={
    'access_token': auth_token,
    'count': 1000
}).json()['data']

photos = [InstagramPhoto(p) for p in raw_images]

for p in photos:
    if p.upload_to_s3():
        p.post_to_blog()

I’ve been playing with making a service that runs containers via Amazon’s Elastic Container Service. This evening I finally got to the point where it was time to start experimenting with schedulers and other such fun-ness.

After a few quick tests, it became clear that the scheduler I’m working on might need to have more information to intelligently schedule tasks in the cluster. Let me try to illustrate what I mean

  • Task definition: 100MB ram, less than 1 core, and takes 5 minutes to complete
  • Container Instance: 512MB of ram / 1 core

I queue 100 of these tasks in SQS. The scheduler quickly pulls the messages out of the queue and tries to schedule the tasks to run in the ECS cluster. After 5 tasks are active, all ram on the container instance is consumed and no new tasks can be run until the existing tasks finish.

Upon ECS task rejection, the scheduler performs no further action on the SQS message. This means that the message eventually gets requeued (after a default of 30 seconds it becomes visible again). This sounded like an easy way to do things, but if the cluster is full, and high priority tasks get rejected, it is possible for lower scheduled tasks to run first if the higher priority messages are invisible (waiting to be requeued) in SQS.

So ideally the scheduler will know the present state of the cluster before actually calling RunTask or StartTask. Unfortunately its kind of a pain to query all of the cluster metadata every time we want to run a task. To gather and collect all of the pertinent information about a given cluster it requires 4 HTTP queries, and a fair amount of json mangling.

I started thinking about this a little, and searching for how other people do stuff like this and I stumbled upon http://williamthurston.com/2015/08/20/create-custom-aws-ecs-schedulers-with-ecs-state.html + https://github.com/jhspaybar/ecs_state

Basically it is a proto-scheduler that keeps cluster state in an in-memory sqlite database. They provide a nice example of how this sort of thing could be used to build a completely customized StartTask API scheduler. For example: https://github.com/jhspaybar/ecs_state/blob/6686cdfc418385e8db76d6bc719c7c278a10b471/ecs_state.go#L373

But to get started I really just want to use the RunTask API, so all I really need to know is “is there space in the cluster right now?”, so I wrote a simple tool to cache cluster metadata in redis which is super easy/fast to query and make simple decisions from.

import boto3
import redis


def summarize_resources(resource_list):
    response = {}
    for r in resource_list:
        response[r['name']] = r['integerValue']
    return response


def cluster_remaining_resources(cluster_name, instance_arns):
    remaining_cpu = 0
    remaining_memory = 0

    for arn in instance_arns:
        instance_key = '%s:instance:%s' % (cluster_name, arn)
        remaining_cpu += int(redis_client.hget(instance_key, 'remaining_cpu'))
        remaining_memory += int(redis_client.hget(instance_key, 'remaining_memory'))
    return dict(cpu=remaining_cpu, memory=remaining_memory)


pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.Redis(connection_pool=pool)

ecs = boto3.client('ecs')

cluster_arns = ecs.list_clusters()['clusterArns']
clusters = ecs.describe_clusters(clusters=cluster_arns)['clusters']

for cluster in clusters:
    cluster_arn = cluster['clusterArn']
    container_instance_list = ecs.list_container_instances(cluster=cluster_arn)
    container_instance_arns = container_instance_list['containerInstanceArns']
    instances = ecs.describe_container_instances(cluster=cluster_arn,
                                                 containerInstances=container_instance_arns)

    for i in instances['containerInstances']:
        registered_resources = summarize_resources(i['registeredResources'])
        remaining_resources = summarize_resources(i['remainingResources'])

        instance_state = {
            'status': i['status'],
            'active_tasks': i['runningTasksCount'],
            'registered_cpu': registered_resources['CPU'],
            'registered_memory': registered_resources['MEMORY'],
            'remaining_cpu': remaining_resources['CPU'],
            'remaining_memory': remaining_resources['MEMORY']
        }

        instance_key = '{cluster_arn}:instance:{instance_arn}'
        key = instance_key.format(cluster_arn=cluster_arn,
                                  instance_arn=i['containerInstanceArn'])
        redis_client.hmset(key, instance_state)

    cluster_resources = cluster_remaining_resources(cluster_arn,
                                                    container_instance_arns)

    cluster_rm_key = '%s:remaining_memory' % cluster_arn
    cluster_rcpu_key = '%s:remaining_cpu' % cluster_arn

    redis_client.set(cluster_rcpu_key, cluster_resources['cpu'])
    redis_client.set(cluster_rm_key, cluster_resources['memory'])

Once that is done you can query whether a cluster has enough resources by running

if redis.get('cluster_arn:remaining_memory') < task_definition_memory_requirement:
    # unable to run task, don't try to RunTask