Commit 01b8230d authored by d.basulto's avatar d.basulto

reconnect functionality added

parent c9ad4f92
......@@ -7,65 +7,75 @@
* @note streamRecorder http://streamingmovil.radioformula.com:8000/m1033 3000
*/
////////////////////////////////////////////////////////////////////////////////
#include "StreamRecorder.h"
#include <fstream>
#include <iostream>
#include <ctime>
#include <cmath>
#include <sstream>
#include <gst/gst.h>
#include <glib.h>
#include <unistd.h>
#include <cstring>
#include "FLAC/metadata.h"
#include "FLAC/stream_encoder.h"
#include <cstring>
using namespace std;
//------------------------------------------------------------------------------
/**
* Constructor
* @param source streaing url
* @param source streaming url
* @param time split recording in segments of time seconds.
*/
StreamRecorder::StreamRecorder(const char* source, int time)
{
int nFrames = ceil(time*STREAMRECORDER_SAMPLERATE/READSIZE);
recordTime = nFrames*READSIZE/STREAMRECORDER_SAMPLERATE;
cout << "record time: " << recordTime << endl;
audioFileDuration = time;
strcpy(pluginUri,source);
bufferSize=nFrames*READSIZE*STREAMRECORDER_BYTESPERSAMPLE;
audioBuffer = new unsigned char[bufferSize];
memset(audioBuffer, 0, bufferSize);
// New buffer
memset(audioBuffer, 0, bufferSize);
audioBufferPosition=audioBuffer;
nBytes=0;
isConnectionLost=false;
createMainPipeline();
connect(source);
connect();
}
//------------------------------------------------------------------------------
/**
* Connect to the stream
* @param uri streaing uri
* @return unimplemented
*/
int StreamRecorder::connect(const char *uri)
int StreamRecorder::connect()
{
disconnect();
cout << "connecting to " << uri << endl;
pluginUri = (char*)uri;
cout << "connecting to " << pluginUri << endl;
gst_element_set_state (mainPipeline, GST_STATE_NULL);
gst_element_set_state (mainPipeline, GST_STATE_READY);
g_object_set (G_OBJECT (streamSrc), "uri", uri, NULL);
g_object_set (G_OBJECT (streamSrc), "uri", pluginUri, NULL);
gst_element_link (streamSrc, audioConvert);
gst_element_set_state (mainPipeline, GST_STATE_PLAYING);
return 0;
}
//------------------------------------------------------------------------------
/**
* disconnect from the stream
* @return unimplemented
......@@ -76,7 +86,9 @@ int StreamRecorder::disconnect()
gst_element_set_state (mainPipeline, GST_STATE_NULL);
return 0;
}
//------------------------------------------------------------------------------
/**
* Create main pipeline
* @return 0 on success else on error
......@@ -129,14 +141,24 @@ int StreamRecorder::createMainPipeline()
return 0;
}
//------------------------------------------------------------------------------
gboolean StreamRecorder::reconnectURIStream(void *instance)
/**
*
* @param class instance
* @return False if an error occurs
*/
gboolean StreamRecorder::reconnectURIStream(void* instance)
{
cout << "reconnectURIStream" << endl;
((StreamRecorder*)instance)->connect(((StreamRecorder*)instance)->pluginUri);
cout << "\nTrying to reconnect with the stream..." << endl << endl;
((StreamRecorder*)instance)->connect();
return FALSE;
}
//------------------------------------------------------------------------------
/**
* disconnect from the stream
* @param the GstBus that sent the message
......@@ -152,46 +174,143 @@ int StreamRecorder::bus_callback (GstBus *bus, GstMessage *message, void *user_d
((StreamRecorder*)user_data)->isConnectionLost = true;
}
switch (GST_MESSAGE_TYPE (message))
{
case GST_MESSAGE_EOS:
cout << "End of stream" << endl;
cout << "End sometimes src" << endl;
cout << "** End of stream **\n" << endl;
if(((StreamRecorder*)user_data)->isConnectionLost) //Enter only if the connection is lost
{
// ----------------------------------------------------------------------------------------
long int actualRecordTime;
long int currentTime = time(NULL);
//cout << "Audio start time (timestamp): " << ((StreamRecorder*)user_data)->timestamp << endl;
//cout << "Actual time (timestamp): " << currentTime << endl;
actualRecordTime = currentTime-((StreamRecorder*)user_data)->timestamp;
cout << "Record time: " << actualRecordTime << endl;
// ----------------------------------------------------------------------------------------
/** Record time is greater than th required */
if(actualRecordTime >= ((StreamRecorder*)user_data)->audioFileDuration)
{
cout << "Bytes readed: " << ((StreamRecorder*)user_data)->nBytes << endl;
saveBuffer(user_data);
}
else
{
/** Moves the pointer to the position corresponding to the difference of the timestamps */
long int bytesToAdd = actualRecordTime*READSIZE*STREAMRECORDER_BYTESPERSAMPLE*10;
cout << "-------------------------------------" << endl;
cout << "Actual Bytes number: " << "(" << ((StreamRecorder*)user_data)->nBytes << ") + ";
cout << "Bytes t/add: " << "(" << bytesToAdd << ")" << endl;
/** Update the pointer and the bytes number */
((StreamRecorder*)user_data)->nBytes+=bytesToAdd;
((StreamRecorder*)user_data)->audioBufferPosition+=bytesToAdd;
cout << "Result : " << ((StreamRecorder*)user_data)->nBytes << endl;
cout << "Buffer size: " << ((StreamRecorder*)user_data)->bufferSize << endl;
cout << "-------------------------------------" << endl;
}
((StreamRecorder*)user_data)->isConnectionLost = false;
/** Try reconnect with the radio stream*/
g_timeout_add(5*1000, reconnectURIStream, user_data);
}
break;
case GST_MESSAGE_ERROR:
gchar *debug;
GError *error;
gst_message_parse_error (message, &error, &debug);
g_free (debug);
cerr << "Error: "<< error->message << endl; //Print specific error
strcpy(((StreamRecorder*)user_data)->errorMessage,error->message);
/** The message doesn't contains null character*/
((StreamRecorder*)user_data)->errorMessage[ERROR_MSG_SIZE-1] = '\0';
cerr << "Error: "<< ((StreamRecorder*)user_data)->errorMessage << endl << endl;
g_error_free (error);
g_timeout_add(60*1000, reconnectURIStream, user_data); //Try to reconnect with the uri stream
if(strcmp(((StreamRecorder*)user_data)->errorMessage, "Stream doesn't contain enough data.") == 0)
{
if(((StreamRecorder*)user_data)->isConnectionLost) //Enter only if the connection is lost
/** Not audio stream received */
if(((StreamRecorder*)user_data)->timestamp == 0)
{
g_timeout_add(5*1000, reconnectURIStream, user_data);
}
else
{
long int actualRecordTime;
long int currentTime = time(NULL);
//cout << "Audio start time (timestamp): " << ((StreamRecorder *) user_data)->timestamp << endl;
//cout << "Actual time (timestamp): " << currentTime << endl;
actualRecordTime = currentTime - ((StreamRecorder *) user_data)->timestamp;
cout << "Record time: " << actualRecordTime << endl;
/** Record time is greater than th required */
if (actualRecordTime >= ((StreamRecorder *) user_data)->audioFileDuration)
{
cout << "Bytes readed: " << ((StreamRecorder *) user_data)->nBytes << endl;
saveBuffer(user_data);
}
else
{
savePartialBuffer(user_data);
/** Moves the pointer to the position corresponding to the difference of the timestamps */
long int bytesToAdd = actualRecordTime * READSIZE * STREAMRECORDER_BYTESPERSAMPLE * 10;
cout << "-------------------------------------" << endl;
cout << "Actual Bytes number: " << "(" << ((StreamRecorder *) user_data)->nBytes << ") + ";
cout << "Bytes t/add: " << "(" << bytesToAdd << ")" << endl;
/** Update the pointer and the bytes number */
((StreamRecorder *) user_data)->nBytes += bytesToAdd;
((StreamRecorder *) user_data)->audioBufferPosition += bytesToAdd;
cout << "Result : " << ((StreamRecorder *) user_data)->nBytes << endl;
cout << "Buffer size: " << ((StreamRecorder *) user_data)->bufferSize << endl;
cout << "-------------------------------------" << endl;
}
((StreamRecorder*)user_data)->isConnectionLost = false;
/** Try reconnect with the radio stream*/
g_timeout_add(5*1000, reconnectURIStream, user_data);
}
}
break;
break;
default:
break;
}
return TRUE;
}
//------------------------------------------------------------------------------
void StreamRecorder::savePartialBuffer(void *user_data)
void StreamRecorder::saveBuffer(void *user_data)
{
int missingBytes = ((StreamRecorder*)user_data)->bufferSize - ((StreamRecorder*)user_data)->nBytes;
((StreamRecorder*)user_data)->audioBufferPosition+=missingBytes;
//((StreamRecorder*)user_data)->audioBufferPosition+=missingBytes;
((StreamRecorder*)user_data)->nBytes+=missingBytes;
((StreamRecorder*)user_data)->compressBuffer();
((StreamRecorder*)user_data)->audioBufferPosition=((StreamRecorder*)user_data)->audioBuffer;
memset (((StreamRecorder*)user_data)->audioBuffer, 0, ((StreamRecorder*)user_data)->bufferSize);
((StreamRecorder*)user_data)->nBytes=0;
}
//------------------------------------------------------------------------------
/**
......@@ -231,6 +350,88 @@ void StreamRecorder::srcNewPad_callback(GstElement *element, GstPad *pad, void *
}
}
//------------------------------------------------------------------------------
/**
* CallBack for handoff signal of identity filter
* @param filter Identity filter
* @param buffer The buffer that just has been received
* @param user_data this
* @return unimplemented
*/
int StreamRecorder::filter_handoff_callback(GstElement* filter, GstBuffer* buffer, void* user_data)
{
GstMapInfo info;
if(!gst_buffer_map (buffer, &info, GST_MAP_READ))
cout << "ERROR: MAPPING IS NOT VALID" << endl;
//GST_BUFFER_DATA is for gst v0.1
// ((StreamRecorder*)user_data)->addToBuffer((unsigned char*)GST_BUFFER_DATA (buffer));
// user data is the class
((StreamRecorder*)user_data)->addToBuffer((unsigned char*)info.data, info.size);
gst_buffer_unmap (buffer, &info);
return 0;
}
//------------------------------------------------------------------------------
/**
* Add audio data to audioBuffer
* @param data Audio data to add
* @param length Data length
* @return Bytes writen
*/
int StreamRecorder::addToBuffer(unsigned char* data, int length) {
int bytesRead = length;// READSIZE*STREAMRECORDER_BYTESPERSAMPLE;
long int currentTime;
long int actualRecordTime;
bool isNewAudioFile;
/** Useful for obtain the filename*/
isNewAudioFile = nBytes == 0;
cout << "Data size: " << bytesRead << endl;
//if(nBytes < bufferSize){
/** Update pointer*/
memcpy((char*)audioBufferPosition, (char*)data, bytesRead);
nBytes+=bytesRead;
audioBufferPosition+=bytesRead;
//}
if(isNewAudioFile)
{
cout << "New audio stream" << endl;
/** filename */
timestamp = time(NULL);
cout << "Audio filename (timestamp): " << timestamp << endl;
}
else
{
currentTime = time(NULL);
actualRecordTime = currentTime-timestamp;
cout << "Record time: " << actualRecordTime << endl;
cout << "Bytes readed " << nBytes << endl;
cout << "Buffer size " << bufferSize << endl;
/** If the buffer is full, save it in flac file */
if(nBytes >= bufferSize)
{
compressBuffer();
audioBufferPosition=audioBuffer;
memset (audioBuffer, 0, bufferSize);
nBytes=0;
}
}
return nBytes;
}
//------------------------------------------------------------------------------
/**
* Save audio data (audioBuffer) in flac format
* @return unimplemented
......@@ -238,15 +439,18 @@ void StreamRecorder::srcNewPad_callback(GstElement *element, GstPad *pad, void *
int StreamRecorder::compressBuffer()
{
long int currentTime = time(NULL);
//long int currentTime = time(NULL);
stringstream ss;
string fileNameStr, currentTimeStr;
ss << currentTime << endl;
ss << timestamp << endl;
getline(ss, currentTimeStr);
ss << currentTimeStr << ".flac" << endl;
getline(ss, fileNameStr);
// Restart timestamp;
timestamp = 0;
int readsize = READSIZE;
FLAC__bool ok = true;
FLAC__StreamEncoder *encoder = 0;
......@@ -344,58 +548,5 @@ int StreamRecorder::compressBuffer()
return 0;
}
//------------------------------------------------------------------------------
/**
* Add audio data to audioBuffer
* @param data Audio data to add
* @param length Data length
* @return Bytes writen
*/
int StreamRecorder::addToBuffer(unsigned char* data, int length) {
//cout << "addToBuffer(" << length << ")" << endl;
int bytesRead = length;// READSIZE*STREAMRECORDER_BYTESPERSAMPLE;
memcpy((char*)audioBufferPosition, (char*)data, bytesRead);
cout << *audioBufferPosition << endl;
audioBufferPosition+=bytesRead;
nBytes+=bytesRead;//READSIZE;
cout << "Bytes readed " << nBytes << endl;
cout << "Buffer size " << bufferSize << endl;
if(nBytes >= bufferSize)
{
compressBuffer();
audioBufferPosition=audioBuffer;
memset (audioBuffer, 0, bufferSize);
nBytes=0;
}
return nBytes;
}
//------------------------------------------------------------------------------
/**
* CallBack for handoff signal of identity filter
* @param filter Identity filter
* @param buffer The buffer that just has been received
* @param user_data this
* @return unimplemented
*/
int StreamRecorder::filter_handoff_callback(GstElement* filter, GstBuffer* buffer, void* user_data)
{
GstMapInfo info;
if(!gst_buffer_map (buffer, &info, GST_MAP_READ))
cout << "ERROR: MAPPING IS NOT VALID" << endl;
//GST_BUFFER_DATA is for gst v0.1
// ((StreamRecorder*)user_data)->addToBuffer((unsigned char*)GST_BUFFER_DATA (buffer));
// user data is the class
((StreamRecorder*)user_data)->addToBuffer((unsigned char*)info.data, info.size);
gst_buffer_unmap (buffer, &info);
return 0;
}
//------------------------------------------------------------------------------
......@@ -11,61 +11,82 @@
#ifndef STREAMRECORDER_H
#define STREAMRECORDER_H
// your public header include
//------------------------------------------------------------------------------
/** Your public header include */
#include <gst/gst.h>
#include <jmorecfg.h>
//------------------------------------------------------------------------------
#define STREAMRECORDER_SAMPLERATE 44100
#define READSIZE 1152 //For MPEG1, frame_size = 1152 samples/frame
#define STREAMRECORDER_BYTESPERSAMPLE 2
// the declaration of your class...
#define ERROR_MSG_SIZE 50
#define DST_URI_SIZE 80
//------------------------------------------------------------------------------
/** Class declaration */
class StreamRecorder
{
private:
unsigned int nBytes;
unsigned int bufferSize;
unsigned char* audioBuffer;
unsigned char* audioBufferPosition;
char errorMessage[ERROR_MSG_SIZE];
char pluginUri[DST_URI_SIZE];
unsigned int nBytes;
unsigned int bufferSize;
int recordTime;
int audioFileDuration;
/** Audio filename */
long int timestamp = 0;
bool isConnectionLost;
char * pluginUri;
//char* sourceName;
//GstElement* audioResample;
//GstElement* tempBin;
//GstElement* audioSink;
GstElement* streamSrc;
GstElement* audioConvert;
//GstElement* audioResample;
GstElement* filterCaps;
GstElement* queue0;
GstElement* queue1;
GstElement* filter;
GstElement* fakeSink;
//GstElement* audioSink;
GstElement* mainPipeline;
//GstElement* tempBin;
int createMainPipeline();
int connect(const char *uri);
int connect();
int disconnect();
// callbacks
static void srcNewPad_callback(GstElement *element, GstPad *pad, void *data);
static int bus_callback(GstBus *bus, GstMessage *message, void *data);
static int filter_handoff_callback(GstElement* filter, GstBuffer* buffer, void* user_data);
// add data to buffer
/** add data to buffer */
int addToBuffer(unsigned char* data, int length);
int compressBuffer();
// Save information when connection fails
static void savePartialBuffer(void *user_data);
/** plugin's callbacks */
static void srcNewPad_callback(GstElement *element, GstPad *pad, void *data);
static int bus_callback(GstBus *bus, GstMessage *message, void *data);
static int filter_handoff_callback(GstElement *filter, GstBuffer* buffer, void* user_data);
/** Save audio*/
static void saveBuffer(void *user_data);
// Restart the pipeline
static gboolean reconnectURIStream(void *data);
/** Restart the pipeline */
static gboolean reconnectURIStream(void* data);
public:
StreamRecorder(const char* source, int time);
};
//------------------------------------------------------------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment