This constantly seems to be a request on Stack Overflow, and the fact that documentation for Google’s Speech API is practically non-existent, I have decided to share an implementation of it with everyone. If you just want the source code here you go.
Google Speech API Supported File Types
First off, your audio must first be encoded in the FLAC audio format for Google’s Speech API to accept it. We will not be transcoding audio in the Python script, so you will have to do it before hand. If you need an easy to use tool to convert your audio files, give fre:ac a try. It is a free, open-source, converter for Windows, Mac OS X, Linux, and FreeBSD. Alright now on to the good stuff.
FLAC Basics
It is really useful to be able to pull out some information from our FLAC files with the Python script so that we don’t have to worry about a 3rd Party library or application. Luckily for us, FLAC is an open source format with really clear specifications. The only information we really need to extract can all be found in the STREAMINFO METADATA_BLOCK. The basic diagram of what we need out of the FLAC file looks like this:
The first METADATA_BLOCK is ALWAYS the STREAM_INFO block. The first bit in the METADATA_BLOCK_HEADER simply marks whether this METADATA_BLOCK is the last one. The next 7 bits mark the BLOCK_TYPE, and then the last 24 bits mark the length of METADATA to follow the header.
So that should be easy enough to at least get started with confirming that the file we send to our script is actually a FLAC file. Let’s see if we can start reading some info from FLAC files
1 2 3 4 5 6 7 8 |
with open("test.flac", "rb") as f: flacBits = f.read(4) # Get the Magic Number if flacBits != "fLaC": print "not a fLaC file!" # We should quit now and return the error flacBits = f.read(1) # Get the first 8 bits of the STREAMINFO METADATA_BLOCK_HEADER if ord(flacBits) == 0: # STREAMINFO BLOCK FOUND!! # do some more stuff... |
Pretty easy right? the python ord() function returns the number of a byte string — the inverse of chr() (which turns a number into an ASCII character). Since STREAMINFO is never the last block, the first 8 bits of the METADATA_BLOCK_HEADER should be 0’s for the STREAMINFO block. 0 for the first bit, and 0’s for the next 7 bits.
STREAMINFO BLOCK
So now that we can determine what is and is not a FLAC file, let’s go ahead and start pulling out the information we need. We only need to tell Google the Sample Rate of the file, however since we are already here why don’t we get as much information as possible.
The STREAMINFO_MEATADATA_BLOCK looks like this:
So going through and parsing this block of data is pretty straight forward. Let’s go ahead and just store the information as we go through them. When it comes to handling raw binary data in Python, struct is a very useful library! We should use it! The most useful feature will be the unpack() method. Make sure you consult the Format Characters chart if you’re not sure what they mean. All numbers are big-endian, so we are using the ‘>’ character before our Format Characters.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from struct import * with open("test.flac", "rb") as f: flacBits = f.read(4) # Get the Magic Number if flacBits != "fLaC": print "not a fLaC flacBits!" flacBits = f.read(1) # Get the STREAMINFO METADATA_BLOCK_HEADER if ord(bytes) == 0: # STREAMINFO BLOCK FOUND!! # Parse the STREMINFO BLOCK minBlockSize, maxBlockSize = unpack('>HH', flacBits.read(4)) minFrameSize = unpack('>I', '\x00' + flacBits.read(3)) FrameSize = unpack('>I', '\x00' + flacBits.read(3)) sampleInfo = flacBits.read(8) sampleInfoBytes = unpack('>Q', sampleInfo)[0] sampleRate = sampleInfoBytes >> 44 channels = ((sampleInfoBytes >> 41) & 7) + 1 bitsperSample = ((sampleInfoBytes >> 36) & 0x1F) + 1 length = (sampleInfoBytes & 0x0000000FFFFFF) / float(sampleRate) |
That’s all the information we need from the FLAC file, how about we turn this into a Class so that it is easier to use. We will rename some things as well as add some error messages so we can catch errors before trying to send the file to Google.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
class fLaC_Reader(object): error = "" sampleRate = 0 channels = 0 bitsperSample = 0 length = 0.0 minBlockSize, maxBlockSize = 0, 0 def __init__(self, file): bytes = file.read(4) # get Magic Number if bytes != "fLaC": self.error += "Not a fLaC file! Aborting\n" bytes = file.read(1) # Get STREAMINFO metadata Block Header if ord(bytes) == 0: # "STREAMINFO BLOCK FOUND" # Jump to the STREAMINFO Block, 24 bits from here. file.read(3) # parse STREMINFO BLOCK self.minBlockSize, self.maxBlockSize = unpack('>HH', file.read(4)) self.minFrameSize = unpack('>I', '\x00' + file.read(3)) self.FrameSize = unpack('>I', '\x00' + file.read(3)) if self.minBlockSize < 16 or self.maxBlockSize < 16: self.error += "Invalid Block Size! Aborting!\n" # if minBlockSize == maxBlockSize: # print "Fixed Blocksize: %d samples" % maxBlockSize sampleInfo = file.read(8) sampleInfoBytes = unpack('>Q', sampleInfo)[0] self.sampleRate = sampleInfoBytes >> 44 self.channels = ((sampleInfoBytes >> 41) & 7) + 1 self.bitsperSample = ((sampleInfoBytes >> 36) & 0x1F) + 1 self.length = (sampleInfoBytes & 0x0000000FFFFFF) / float(self.sampleRate) else: self.error += "STREAMINFO BLOCK not first\n" |
Perfect, now to get all of the information about a FLAC file we can call the class like this on the file’s object
1 2 |
with open("test.flac", "rb") as f: flacFile = fLaC_Reader(f) |
Now we can pull out the information we need like
3 |
flacFile.sampleRate |
Google Speech to Text API Basics
Now that we can get the information we need out of a FLAC file, we can send it to Google for transcription. There exist a couple of endpoints for the Google Speech to Text API; we will be using Google’s full-duplex API. The full-duplex version does not have a limit on file size, or length, and is what Chrome uses for their fancy Web Speech API. The only problem with the full-duplex one is it does require an API key to use, and can be a little tricky due to the fact that Google has not made ANY documentation available for it. For more details about the other API endpoints, or how to get an API key, see my earlier post about it.
The steps to call Google’s Speech to Text API are
- Connect the Download stream – https://www.google.com/speech-api/full-duplex/v1/down
- Parameters:
- pair
- Parameters:
- Connect the Upload stream – https://www.google.com/speech-api/full-duplex/v1/up
- Parameters:
- key: API Key
- pair: A random string of Letters and Numbers used to join to the Upload stream
- lang: What language i.e. “en-US”
- continuous: keep the connections open
- interim: send back information as it becomes available (before final: true)
- pFilter*: Profanity filter (0: none, 1: some, 2: strict)
- There are a lot of other options like grammar for specifying a particular grammar engine. To be honest, I have no idea what grammars are available.
- Parameters:
- Upon successful connection, Google will send back an empty result: {“result”:[]}
- Keep the Download & Upload stream open until Google Finishes responding
- Google will signal the transcription is finished with a final: true tag in the JSON object
- Files with long silences in them will have multiple final: true sections returned!
- Google will also send a response to the Upload stream connection to signal there is nothing else to process
- Close the Upload Stream
- Close the Download Stream
The tricky part is getting the Download and Upload streams to function simultaneously and asynchronously. Sounds like a perfect job for Python Threads and the incredibly useful Requests library!
Let’s go ahead and start with putting this into a Class as well. We will need to store the result in an array in case our audio file has a large gap of silence between audio, as Google will send multiple transcriptions for each part. We can use our fLaC_Reader class to pull out the information we need as well. For the Upstream, we also need to set a ‘Content-Type’ header with the value of ‘audio/x-flac; rate=OUR SAMPLE RATE’. If the rate does not match the file, then Google will send back some very strange results as they do not process it correctly
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class GoogleSpeechAPI(object): result = '' length = 0 sampleRate = 0 def __init__(self, file): self.result = [] self.file = file self.flac = fLaC_Reader(file) self.length = self.flac.length self.sampleRate = self.flac.sampleRate self.upstream_url = "https://www.google.com/speech-api/full-duplex/v1/up?key=%(key)s&pair=%(pair)s&lang=en-US&client=chromium&continuous&interim&pFilter=0" self.upstream_headers = {'content-type': 'audio/x-flac; rate=' + str(self.flac.sampleRate)} self.downstream_url = "https://www.google.com/speech-api/full-duplex/v1/down?pair=%(pair)s" self.api_key = "ENTER YOUR API KEY HERE" self.timeSinceResponse = 0 self.response = "" self.connectionSuccessful = False self.no_result = False |
Some other functions that we need in the class to make it work are
1 2 |
def getPair(self): return hex(random.getrandbits(64))[2:-1] |
This generates a random pair value for us to use. The next one we need is the one that yields the data for the Upstream
1 2 3 4 5 6 7 8 9 10 11 |
def gen_data(self): while True: item = self.file.read((self.flac.minBlockSize * self.flac.bitsperSample) / 8) if item: yield item else: if self.no_result or self.timeSinceResponse > 2: return #Google is Done Responding, close UpStream time.sleep(.5) self.timeSinceResponse += .5 yield "00000000" |
It goes through the file in manageable blocks based on the BlockSize and bitsperSample, making it a multiple of 8. Once it is done uploading, since we can’t easily check the upstream for a response from Google we can use the interim results being returned from Google to gauge when Google is done with the file. If we don’t hear anything after 2 seconds after Google has started sending results, it’s pretty safe to assume we’re done. In the meantime, we need to keep the Upstream open by sending Google dummy data. In this case it’s just a string of 8 zeros.
The last helper function we need is one to help us check if a response is a final:true response.
1 2 3 4 5 6 7 8 9 10 |
def final(self): try: response = json.loads(self.response) if response['result']: if 'final' in response['result'][0]: return response['result'][0]['final'] except Exception, e: # assuming invalid JSON return False return False |
Okay. Now to the controlling the Threading and the Upload and Download streams. How about a function aptly named start()? This will kick off the threads for the Upload and Download streams. It will also close the threads once the streams are finished. We are using a Request Sessions for each Thread, as a Session is not Thread safe– although it usually works most of the time if you use only one Session for both the upstream and downstream. We can also skip the whole hassle of sending the file to Google if it’s not a valid FLAC file.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
def start(self): if self.flac.error != "": self.result = self.flac.error return pair = self.getPair() upstream_url = self.upstream_url % {"pair": pair, "key": self.api_key} downstream_url = self.downstream_url % {"pair": pair, "key": self.api_key} self.file.seek(0) self.upsession = requests.Session() self.downsession = requests.Session() self.upstream_thread = Thread(target=self.upstream, args=(upstream_url,)) self.downstream_thread = Thread(target=self.downstream, args=(downstream_url,)) self.downstream_thread.start() self.upstream_thread.start() self.stop() |
The stop() function is simple. We call join() on both of the threads. Join() will simply wait for the threads to finish, and then close them. The rest of the python script will wait for the Join() to finish before executing the next line. So we can actually time how long Google takes to Transcribe an audio file.
1 2 3 |
def stop(self): self.downstream_thread.join() self.upstream_thread.join() |
Okay, now for the upstream process. Using requests, it’s pretty simple. We don’t need to capture any of the results of the upstream connection, but we should automatically retry if the connection is unsuccessful:
1 2 3 4 |
def upstream(self, url): result = self.upsession.post(url, headers=self.upstream_headers, data=self.gen_data()) if result.status_code != 200: self.start() |
The Downstream is a little more complicated. We are going to be mimicking the Deterministic Finite Automata that Chrome uses for interacting with the Speech API, but in a much more basic and crude way. We need to know what state we are in, 1. Not Connected, 2. Successful Connection, 3. If we have received a second empty response after a Successful Connection, 4. If we have started receiving responses, and 5. If we have a final result. With the stream=True parameter we can access the responses as they come with the iter_lines() method. We store the response in the self.response variable, as that is what the self.final() function checks. If it’s a final result we add it to our results, and keep going. Our Downstream will keep listening for responses until the Upstream is closed. The upstream will only close when the Downstream quits receiving responses. So every time we get a result back from Google we reset the timer to keep the Upload stream going. If we don’t hear anything for 2 seconds, the Upstream is closed, and the Downstream finishes. Pretty cool huh? Just like the upstream we are also going to restart the connection if it was unable to connect.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def downstream(self, url): r = self.downsession.get(url, stream=True) self.status_code = r.status_code if r.status_code == 200: for line in r.iter_lines(): self.timeSinceResponse = 0 self.response = line if line == '{"result":[]}': # Google sends back an empty result signifying a successful connection if not self.connectionSuccessful: self.connectionSuccessful = True else: # another empty response means Google couldn't find anything in the audio ... # Make the result pretty / match normal results self.result.append('{"result":[{"alternative":[{"transcript":"","confidence":0.99999}],"final":true}],"result_index":0}') self.no_result = True if self.final(): self.result.append(line) self.response = "" else: self.start() |
Well, that is all you need to have audio files transcribed by Google! Let’s write a quick script that utilizes our two new classes!
I want to time how long it takes so I’m going to write a quick Class for timing code:
1 2 3 4 5 6 7 8 |
class Timer: def __enter__(self): self.start = time.clock() return self def __exit__(self, *args): self.end = time.clock() self.interval = self.end - self.start |
I will also need to add a couple more imports to make everything work at the top.
1 2 3 4 5 6 |
import random import json from threading import Thread from struct import * import time import requests # External Library http://docs.python-requests.org/en/latest/user/install/#install |
Now I can simply open a file, start the transcription, and time it!
1 2 3 4 5 6 7 8 |
with open("test.flac", 'rb') as f: result = GoogleSpeechAPI(f) print "Audio is %.03f seconds long" % result.length f.seek(0) with Timer() as t: result.start() print "Result took %.03f sec" % t.interval print result.result |
The output looks like this:
1 2 3 4 |
opening test.flac: Audio is 5.016 seconds long Result took 6.372 sec ['{"result":[{"alternative":[{"transcript":"testing Google transcription service 12234","confidence":0.65005225}],"final":true}],"result_index":0}'] |
Well, I hope you found this tutorial enlightening. Special thanks go out to the Chromium Project for making all of their code available 🙂
Here is the complete source code / working example (minus an API key).
Happy Transcribing!
You may also like:
Latest posts by Travis Payton (see all)
- Adventures with Sendmail - January 3, 2019
- Django – selective restore of DB dump - April 11, 2016
- Cord Cutting The Geek Way – Watch your favorite TV shows for free, anytime, anywhere! - January 20, 2016
16 comments for “Python Google Speech to Text API implementation”