Skip to content

Latest commit

 

History

History
108 lines (96 loc) · 3.6 KB

graphql-pagination-python.md

File metadata and controls

108 lines (96 loc) · 3.6 KB

Paginating through the GitHub GraphQL API with Python

(See also Building a self-updating profile README for GitHub on my blog)

For my auto-updating personal README I needed to fetch the latest release for every repository I have on GitHub. Since I have 316 public repos I wanted the most efficent way possible to do this. I decided to use the GitHub GraphQL API.

Their API allows you to fetch up to 100 repositories at once, and each one can return up to 100 releases. Since I only wanted the most recent release my query ended up looking like this:

query {
  viewer {
    repositories(first: 100, privacy: PUBLIC) {
      pageInfo {
        hasNextPage
        endCursor
      }
      nodes {
        name
        releases(last:1) {
          totalCount
          nodes {
            name
            publishedAt
            url
          }
        }
      }
    }
  }
}

This gives me back my 100 first repos, and for each one returns the most recent release (if a release exists).

Just one problem: I needed to paginate through all 316. The way you do this with the GitHub GraphQL API is using the after: argument and the endcursor returned from pageInfo. You can send after:null to get the first page, then after:TOKEN where TOKEN is the endCursor from the previous results.

My Python code ended up looking like this (using python-graphql-client):

from python_graphql_client import GraphqlClient

client = GraphqlClient(endpoint="https://api.github.com/graphql")

def make_query(after_cursor=None):
    return """
query {
  viewer {
    repositories(first: 100, privacy: PUBLIC, after:AFTER) {
      pageInfo {
        hasNextPage
        endCursor
      }
      nodes {
        name
        releases(last:1) {
          totalCount
          nodes {
            name
            publishedAt
            url
          }
        }
      }
    }
  }
}
""".replace(
        "AFTER", '"{}"'.format(after_cursor) if after_cursor else "null"
    )


def fetch_releases(oauth_token):
    repos = []
    releases = []
    repo_names = set()
    has_next_page = True
    after_cursor = None

    while has_next_page:
        data = client.execute(
            query=make_query(after_cursor),
            headers={"Authorization": "Bearer {}".format(oauth_token)},
        )
        print()
        print(json.dumps(data, indent=4))
        print()
        for repo in data["data"]["viewer"]["repositories"]["nodes"]:
            if repo["releases"]["totalCount"] and repo["name"] not in repo_names:
                repos.append(repo)
                repo_names.add(repo["name"])
                releases.append(
                    {
                        "repo": repo["name"],
                        "release": repo["releases"]["nodes"][0]["name"]
                        .replace(repo["name"], "")
                        .strip(),
                        "published_at": repo["releases"]["nodes"][0][
                            "publishedAt"
                        ].split("T")[0],
                        "url": repo["releases"]["nodes"][0]["url"],
                    }
                )
        has_next_page = data["data"]["viewer"]["repositories"]["pageInfo"][
            "hasNextPage"
        ]
        after_cursor = data["data"]["viewer"]["repositories"]["pageInfo"]["endCursor"]
    return releases

Full code here: https://github.com/simonw/simonw/blob/50d4188f9f067b68b2203540f1983750d51800db/build_readme.py