Skip to content

Commit

Permalink
Merge pull request #43 from ohnosequences/pr/43
Browse files Browse the repository at this point in the history
Refactor SNS-related code
  • Loading branch information
laughedelic authored Oct 12, 2016
2 parents 18bbd6e + 5503ef2 commit ff7e1f2
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 120 deletions.
35 changes: 0 additions & 35 deletions src/main/scala/ohnosequences/awstools/sns/SNS.scala

This file was deleted.

85 changes: 0 additions & 85 deletions src/main/scala/ohnosequences/awstools/sns/Topic.scala

This file was deleted.

15 changes: 15 additions & 0 deletions src/main/scala/ohnosequences/awstools/sns/client.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ohnosequences.awstools.sns

import com.amazonaws.services.sns.model._
import com.amazonaws.services.sns.AmazonSNS
import scala.util.Try

case class ScalaSNSClient(val asJava: AmazonSNS) extends AnyVal { sns =>

def getOrCreate(name: String): Try[Topic] = Try {
asJava.createTopic(name)
}.map { response =>
Topic(asJava, response.getTopicArn)
}

}
21 changes: 21 additions & 0 deletions src/main/scala/ohnosequences/awstools/sns/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ohnosequences.awstools

import com.amazonaws.auth._
import com.amazonaws.services.sns.{ AmazonSNS, AmazonSNSClient }
import ohnosequences.awstools.regions._

package object sns {

def client(
region: Region,
credentials: AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain()
): AmazonSNSClient = {
new AmazonSNSClient(credentials)
.withRegion(region.toAWSRegion)
}

// Implicits
implicit def toScalaSNSClient(sns: AmazonSNS):
ScalaSNSClient =
ScalaSNSClient(sns)
}
103 changes: 103 additions & 0 deletions src/main/scala/ohnosequences/awstools/sns/topics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package ohnosequences.awstools.sns

import ohnosequences.awstools.sqs.Queue

import com.amazonaws.services.sns.AmazonSNS
import com.amazonaws.services.sns.model._
import com.amazonaws.auth.policy.{Resource, Principal, Statement, Policy}
import com.amazonaws.auth.policy.Statement.Effect
import com.amazonaws.auth.policy.actions.SQSActions
import com.amazonaws.auth.policy.conditions.ConditionFactory
import com.amazonaws.services.sqs.model.QueueAttributeName
import scala.collection.JavaConversions._
import scala.util.Try
import java.net.URI

case class Topic(
sns: AmazonSNS,
arn: String
) { topic =>

def delete(): Try[Unit] = Try { sns.deleteTopic(topic.arn) }


def publish(msg: String): Try[String] = Try {
sns.publish(topic.arn, msg).getMessageId
}

def publish(msg: String, subject: String): Try[String] = Try {
sns.publish(topic.arn, msg, subject).getMessageId
}

// TODO: publishJSON with dispatch by the subscriber protocol

// TODO: make it return a Stream making more requests only if needed
def listAllSubscriptions: Seq[Subscription] = {

@scala.annotation.tailrec
def tokens_rec(response: ListSubscriptionsByTopicResult, acc: Seq[Subscription]): Seq[Subscription] = {
val subs = response.getSubscriptions

// NOTE: next token is null if there's nothing more to list
Option(response.getNextToken) match {
case Some(token) if (subs.nonEmpty) => tokens_rec(
sns.listSubscriptionsByTopic(topic.arn, token),
subs ++ acc
)
case _ => acc
}
}

tokens_rec(sns.listSubscriptionsByTopic(topic.arn), Seq())
}

def subscribe(subscriber: Subscriber): Try[String] = Try {
sns.subscribe(
topic.arn,
subscriber.protocol,
subscriber.endpoint
).getSubscriptionArn
}

def subscribed(subscriber: Subscriber): Boolean = {

listAllSubscriptions.exists { sub =>
(sub.getProtocol == subscriber.protocol) &&
(sub.getEndpoint == subscriber.endpoint)
}
}

// def setAttribute(name: String, value: String) {
// sns.setTopicAttributes(new SetTopicAttributesRequest(topic.arn, name, value))
// }

override def toString = topic.arn
}


sealed abstract class Subscriber private[awstools](
val protocol: String,
val endpoint: String
)

case object Subscriber {

// delivery of JSON-encoded message via HTTP POST
case class http(uri: URI) extends Subscriber("http", uri.normalize.toString)
// delivery of JSON-encoded message via HTTPS POST
case class https(uri: URI) extends Subscriber("https", uri.normalize.toString)
// delivery of message via SMTP
case class email(addr: String) extends Subscriber("email", addr)
// delivery of JSON-encoded message via SMTP
case class email_json(addr: String) extends Subscriber("email-json", addr)
// delivery of message via SMS
case class sms(phone: Long) extends Subscriber("sms", math.abs(phone).toString)
// delivery of JSON-encoded message to an Amazon SQS queue
case class sqs(queue: Queue) extends Subscriber("sqs", queue.arn)

// TODO:
// delivery of JSON-encoded message to an EndpointArn for a mobile app and device.
// case class application()
// delivery of JSON-encoded message to an AWS Lambda function.
// case class lambda(lamdba: Lambda)
}

0 comments on commit ff7e1f2

Please sign in to comment.