Skip to content
This repository has been archived by the owner on Feb 28, 2022. It is now read-only.

Commit

Permalink
landingzone imported from github.com/seadsystem/Backend/DB/landingzone
Browse files Browse the repository at this point in the history
  • Loading branch information
iangudger committed Oct 11, 2015
0 parents commit d8136f2
Show file tree
Hide file tree
Showing 18 changed files with 1,718 additions and 0 deletions.
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[![Build Status](https://travis-ci.org/seadsystem/Backend.svg)](https://travis-ci.org/seadsystem/Backend) [![Coverage Status](https://coveralls.io/repos/seadsystem/Backend/badge.svg?branch=master&service=github)](https://coveralls.io/github/seadsystem/Backend?branch=master)

#Go Landing Zone
The Go Landing Zone is a high speed, concurrent TCP server which listens for connections from SEAD plugs on port 9000. The plug requests its configuration from the server before beginning to send the buffered data it collects. This stream of packets is decoded, and the data is bulk inserted into the Postgres database in the background to be later queried by the API.

The service can be started using the init script installed via Puppet:
```sh
$ sudo service landingzone start
```

The landing zone is not intended to directly interacted with by a user. The data collected by the landing zone is accessible via the API. The correct way to test if the landing zone is functioning properly is to connect a SEAD plug and check if new data becomes available through the API.

Once we have the groundwork necessary to organize the protocol coordination between the SEAD panel and the landing zone, we will be able receive data from the SEAD panel and perform a classification of the raw data to determine the appliance type. This will complete our final user story:
>As a SEAD panel developer, I want to send raw appliance data to the server and a classification result to be rendered in the GUI.
##Installation

Installation of the Go Landing Zone and the Python API can be automated by using the Puppet modules included in the repository. The puppet modules are written for and assume to be executed on an Ubuntu 14.04 x64 Linux server. First you must install the prerequisites for running Puppet. From the terminal, execute:
```sh
$ sudo apt-get install puppet git
```
It is recommended that you also install fail2ban with the following command:
```sh
$ sudo apt-get install fail2ban
```

Copy the Puppet files onto the server (for example, by cloning the repository) and change to the DB directory.
```sh
$ cd DB/
```
If desired, configure the UNIX application user’s password in puppet/modules/config. First add the user credentials to manifests/credentials.pp, then uncomment the password definitions in config/manifests/init.pp.
```sh
$ cd puppet/config
$ nano manifests/credentials.pp
$ nano manifests/init.pp
$ cd ../..
```

Copy the files to the /etc/puppet directory, and execute Puppet:
```sh
$ sudo rsync -avc puppet/ /etc/puppet/
$ sudo puppet apply puppet/manifests/site.pp
```
After Puppet has executed the modules correctly, the server should be listening on ports 8080 and 9000. Verify with netstat:
```sh
$ netstat -tln | egrep ':(8080|9000)'
```
32 changes: 32 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Package contains constants and config for the program. Easier than a true config file.
*/
package constants

const (
HOST = "0.0.0.0" // All host names
SEAD_PLUG_PORT = "9000"
RPI_1_PORT = "9001"
EGAUGE_PORT = "9002"

LENGTH_HEADER_SIZE = 3 // Measured in bytes

// Packet constants for plug communication
HEAD = "@H\n"
ACK = "\x06\n"
OKAY = "@K\n"
CONFIG = "@SLITIsEP00000000500000C00001\n@SLITVsEP00000000500000C00001\n@SLITWsEP00000000500000C00001\n@SLITTsEP00000000500000C00001\n" // Sample config pulled from the old database for plug serial #00001

READ_TIME_LIMIT = 10 // Measured in seconds
WRITE_TIME_LIMIT = 5 // Measured in seconds

// Database
DB_SOCKET = "/var/run/postgresql" // Use localhost or appropriate hostname for IP connection.
DB_PORT = 5432
DB_USER = "landingzone"
DB_NAME = "seads"
DB_PASSWORD = "" // Password unneeded for Postgres peer authentication.
DB_MAX_CONNS = 90 // Should be slightly smaller than the max number in PostgreSQL's postgresql.conf file.
)

var Verbose bool = false
109 changes: 109 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Package database handles connecting and writing to the database.
*/
package database

import (
"database/sql"
"fmt"
"log"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/lib/pq"

"github.com/seadsystem/Backend/DB/landingzone/constants"
"github.com/seadsystem/Backend/DB/landingzone/decoders"
)

type DB struct {
conn *sql.DB
}

func New() (DB, error) {
conn, err := sql.Open("postgres", fmt.Sprintf("host=%s user=%s dbname=%s password=%s port=%d sslmode=disable", constants.DB_SOCKET, constants.DB_USER, constants.DB_NAME, constants.DB_PASSWORD, constants.DB_PORT))
return DB{conn}, err
}

func NewMock() (DB, sqlmock.Sqlmock, error) {
conn, mock, err := sqlmock.New()
return DB{conn}, mock, err
}

func (db DB) Close() error {
return db.conn.Close()
}

func (db DB) SetMaxOpenConns(n int) {
db.conn.SetMaxOpenConns(n)
}

func (db DB) Insert(iter decoders.Iterator) (err error) {
log.Println("Beginning transaction...")
// Begin transaction. Required for bulk insert
txn, err := db.conn.Begin()
if err != nil {
return
}

// Prepare bulk insert statement
stmt, err := txn.Prepare(pq.CopyIn("data_raw", "serial", "type", "data", "time", "device"))

// Cleanup either when done or in the case of an error
defer func() {
log.Println("Closing off transaction...")

if stmt != nil {
// Flush buffer
if _, eerr := stmt.Exec(); eerr != nil {
if err == nil {
err = eerr
}
}

// Close prepared statement
if cerr := stmt.Close(); cerr != nil {
if err == nil {
err = cerr
}
}
} else {
fmt.Println("stmt == nil")
}

// Rollback transaction on error
if err != nil {
txn.Rollback()
log.Println("Transaction rolled back")
return
}

// Commit transaction
err = txn.Commit()

log.Println("Transaction closed")
}()

// Check for error from preparing statement
if err != nil {
return
}

for {
var row *decoders.DataPoint
row, err = iter()
if row == nil || err != nil {
break
}

if constants.Verbose {
log.Println("Data:", row.Data)
log.Println("Time:", row.Time)
}

// Insert data. This is buffered.
if _, err = stmt.Exec(row.Serial, string(row.Type), row.Data, row.Time, row.Device); err != nil {
break
}
}
return
}
156 changes: 156 additions & 0 deletions database/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package database

import (
"errors"
"testing"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"

"github.com/seadsystem/Backend/DB/landingzone/constants"
"github.com/seadsystem/Backend/DB/landingzone/decoders"
)

func TestNewMock(t *testing.T) {
db, mock, err := NewMock()
mock.ExpectClose()
if err != nil {
t.Fatalf("got NewMock() = _, %v, want = _, nil", err)
}
if err := db.Close(); err != nil {
t.Fatalf("got db.Close() = %v, want = nil", err)
}
}

func TestNew(t *testing.T) {
if _, err := New(); err != nil {
t.Fatalf("got New() = _, %v, want = _, nil", err)
}
}

func TestSetMaxOpenConns(t *testing.T) {
db, mock, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}
mock.ExpectClose()

db.SetMaxOpenConns(5)
if err := db.Close(); err != nil {
t.Fatalf("got db.Close() = %v, want = nil", err)
}
}

func TestInsert(t *testing.T) {
oldVerbosity := constants.Verbose
constants.Verbose = true
defer func() { constants.Verbose = oldVerbosity }()

closureIndex := 0
var iter = func() (*decoders.DataPoint, error) {
if closureIndex >= 3 {
return nil, nil
}
point := &decoders.DataPoint{
Serial: 64,
Type: 'T',
Data: int64(closureIndex),
Time: time.Unix(int64(500+closureIndex), 0),
}
closureIndex++
return point, nil
}

db, mock, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}
mock.ExpectBegin()
query := "COPY \\\"data_raw\\\" \\(\\\"serial\\\", \\\"type\\\", \\\"data\\\", \\\"time\\\", \\\"device\\\"\\) FROM STDIN"
stmt := mock.ExpectPrepare(query)
stmt.ExpectExec().WithArgs(64, "T", 0, time.Unix(500, 0), nil).WillReturnResult(sqlmock.NewResult(0, 1))
stmt.ExpectExec().WithArgs(64, "T", 1, time.Unix(501, 0), nil).WillReturnResult(sqlmock.NewResult(0, 1))
stmt.ExpectExec().WithArgs(64, "T", 2, time.Unix(502, 0), nil).WillReturnResult(sqlmock.NewResult(0, 1))
stmt.ExpectExec().WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

if err := db.Insert(iter); err != nil {
t.Errorf("got db.Insert(iter) = %v, want = nil", err)
}
}

func TestInsertBeginErr(t *testing.T) {
db, _, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}

if err := db.Insert(func() (*decoders.DataPoint, error) { return nil, nil }); err == nil || err.Error() != "all expectations were already fulfilled, call to database transaction Begin was not expected" {
t.Errorf("got db.Insert() = %v, want = nil", err)
}
}

func TestInsertPrepareErr(t *testing.T) {
db, mock, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}
mock.ExpectBegin()
mock.ExpectRollback()

want := `call to Prepare stetement with query 'COPY "data_raw" ("serial", "type", "data", "time", "device") FROM STDIN', was not expected, next expectation is: ExpectedRollback => expecting transaction Rollback`
if err := db.Insert(func() (*decoders.DataPoint, error) { return nil, nil }); err == nil || err.Error() != want {
t.Errorf("got db.Insert() = %v, want = %s", err, want)
}
}

func TestInsertFlushErr(t *testing.T) {
db, mock, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}
mock.ExpectBegin()
mock.ExpectPrepare("COPY \\\"data_raw\\\" \\(\\\"serial\\\", \\\"type\\\", \\\"data\\\", \\\"time\\\", \\\"device\\\"\\) FROM STDIN")
mock.ExpectRollback()

want := `call to exec query 'COPY "data_raw" ("serial", "type", "data", "time", "device") FROM STDIN' with args [], was not expected, next expectation is: ExpectedRollback => expecting transaction Rollback`
if err := db.Insert(func() (*decoders.DataPoint, error) { return nil, nil }); err == nil || err.Error() != want {
t.Errorf("got db.Insert() = %v, want = %s", err, want)
}
}

func TestInsertCloseErr(t *testing.T) {
db, mock, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}
mock.ExpectBegin()
want := errors.New("STMT ERROR")
stmt := mock.ExpectPrepare("COPY \\\"data_raw\\\" \\(\\\"serial\\\", \\\"type\\\", \\\"data\\\", \\\"time\\\", \\\"device\\\"\\) FROM STDIN").WillReturnCloseError(want)
stmt.ExpectExec().WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectRollback()

if err := db.Insert(func() (*decoders.DataPoint, error) { return nil, nil }); err == nil || err.Error() != want.Error() {
// TODO: Figure out why this test doesn't work.
// Disabled until issue is fixed:
// https://github.com/DATA-DOG/go-sqlmock/issues/25
//t.Errorf("got db.Insert() = %v, want = %v", err, want)
}
}

func TestInsertErr(t *testing.T) {
db, mock, err := NewMock()
if err != nil {
t.Fatalf("got NewMock() = _, _, %v, want = _, _, nil", err)
}
mock.ExpectBegin()
query := "COPY \\\"data_raw\\\" \\(\\\"serial\\\", \\\"type\\\", \\\"data\\\", \\\"time\\\", \\\"device\\\"\\) FROM STDIN"
stmt := mock.ExpectPrepare(query)
stmt.ExpectExec().WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectRollback()

want := "call to exec query 'COPY \"data_raw\" (\"serial\", \"type\", \"data\", \"time\", \"device\") FROM STDIN' with args [0 \x00 0 0001-01-01 00:00:00 +0000 UTC <nil>], was not expected, next expectation is: ExpectedRollback => expecting transaction Rollback"
if err := db.Insert(func() (*decoders.DataPoint, error) { return &decoders.DataPoint{}, nil }); err == nil || err.Error() != want {
t.Errorf("got db.Insert(iter) = %v, want = %s", err, want)
}
}
28 changes: 28 additions & 0 deletions decoders/decoders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package decoders

import (
"errors"
"time"
)

type DataPoint struct {
Serial int64
Type byte
Device *string
Data int64
Time time.Time
}

type Iterator func() (*DataPoint, error)

func NewErrorIterator(err error) func() (*DataPoint, error) {
return func() (*DataPoint, error) {
return nil, err
}
}

func NewEmptyIterator() func() (*DataPoint, error) {
return NewErrorIterator(nil)
}

var NoData error = errors.New("no data in packet")
Loading

0 comments on commit d8136f2

Please sign in to comment.