googlePubsubR

This vignette will provide a walk through for the most common functions that will cover the majority of use cases.

Authentication

In order to authenticate the client, the following environment variables will need to be set:

  • GCP_AUTH_FILE: path to the .json file containing GCP credentials for a service account enabled to interact with the Pub/Sub API. You can also directly pass the path as a function argument.
  • GCP_PROJECT: your GCP project id.

Upon setting the environment variables, it is just a matter of calling:

More authentication methods can be found in the ?pubsub_auth documentation.

Creating and getting resources

Topics

In their most basic form, topics can be created in the following way:

# Create a topic that retains messages for a minimum of 4 hours.
#  We'll attach some cutom labels to make it easier to spot
topic <- topics_create(name = "vignette-topic", message_retention_duration = 14400)

# A topic object is returned
topic

# $labels
# $labels$type
# [1] "pkg_vignette"
# 
# 
# $name
# [1] "projects/<my-gcp-project>/topics/vignette-topic"
# 
# $kmsKeyName
# NULL
# 
# $satisfiesPzs
# NULL
# 
# $messageStoragePolicy
# NULL
# 
# $schemaSettings
# NULL
# 
# $messageRetentionDuration
# [1] "14400s"
# 
# attr(,"class")
# [1] "Topic" "list"

It is possible to interact with pre-existing topic objects. For instance, one could retrieve a Topic object from an existing topic:

if(topics_exists("vignette-topic")) {
    topic <- topics_get("vignette-topic")
}

In order to consume messages from a topic, a subscription will be needed:

# You can either pass a Topic object or a topic name
sub <- subscriptions_create(
  name = "vignette-sub",
  topic = topic,
  # Messages will expire after 3 days of inactivity (no messages acked)
  expiration_policy = ExpirationPolicy(86400),
  # We'll retain unacked messages for 12 hours
  msg_retention_duration = 43200,
  # We'll retry message delivery with at least 1 second delay from the previous try
  retry_policy = RetryPolicy(min_backoff = 1) 
)

sub

# $deadLetterPolicy
# NULL
# 
# $messageRetentionDuration
# [1] "82400s"
# 
# $labels
# NULL
# 
# $retryPolicy
# $retryPolicy$minimumBackoff
# [1] "1s"
# 
# $retryPolicy$maximumBackoff
# [1] "600s"
# 
# 
# $pushConfig
# named list()
# 
# $ackDeadlineSeconds
# [1] 10
# 
# $expirationPolicy
# $expirationPolicy$ttl
# [1] "86400s"
# 
# 
# $filter
# NULL
# 
# $detached
# NULL
# 
# $retainAckedMessages
# NULL
# 
# $topic
# [1] "projects/<my-gcp-project>/topics/vignette-topic"
# 
# $name
# [1] "projects/<my-gcp-project>/subscriptions/vignette-sub"
# 
# $enableMessageOrdering
# NULL
# 
# $topicMessageRetentionDuration
# [1] "14400s"
# 
# attr(,"class")
# [1] "Subscription" "list"

We can also inspect all subscriptions attached to a given topic.

topics_list_subscriptions(topic = "vignette-topic")

Schemas

Schemas can be used to enforce a specific format on incoming messages, this will force all malformed messages to be discarded or sent to a dead letter queue. A schema object can be passed to a topic upon creation. Getting the schema definition in the right format (Pub/Sub expects it as a string) can be quite fiddly. In this example we’ll define an AVRO schema.

avro_schema <- schemas_create(
  name = "vignette-schema", 
  type = "AVRO",
  # toJSON as the API expects a string containing the definition of the AVRO schema
  definition = toJSON(list(
    name = "cutlery",
    type = "record",
    fields = list(
      list("name" = "name", "type" = "string"),
      list("name" = "price", "type" = "int")
    )
  ), auto_unbox = T)
)

# Test a message against the schema
msg <- list(
    name = "John",
    price = 123
) %>% 
  toJSON(auto_unbox = T) %>%
  charToRaw() %>% 
  base64enc::base64encode() %>% 
  PubsubMessage()

schemas_validate_message(schema = "vignette-schema", message = msg, encoding = "JSON")
# [1] TRUE

# Create a new topic and attach the schema
topic <- topics_create(
  name = "vignette-topic", 
  message_retention_duration = 14400, 
  schema_settings = SchemaSettings(encoding = "JSON", "vignette-schema")
)

Snapshots and seek

There are two ways to seek a subscription back in time:

  • Provide a snapshot of the subscription
  • Via a timestamp in RFC3339 UTC “Zulu” format
# Create a snapshot of a subscription
snapshot <- snapshots_create(name = "vignette-snap", subscription = "vignette-sub")

# 'Rewind' the subscription to the snapshot we've just created
subscriptions_seek(subscription = "vignette-sub", snapshot = snapshot)
# [1] TRUE

# Seek the subscription to a specific timestamp
subscriptions_seek("vignette-sub", time = "2021-11-08T23:55:00Z")

Messages

Pubsub messages are expected encoded as base64 strings, depending on the object you’re dealing with, the process to convert them in the right format might vary. Below we’ll convert a dataframe in a format that will not upset Pubsub that much:

msg <- data.frame(name = "fork", price = 999) %>%
  as.list() %>%
  toJSON(auto_unbox = TRUE) %>%
  charToRaw() %>%
  base64enc::base64encode() %>%
  PubsubMessage()
  
topics_publish(messages = msg, topic = 'vignette-topic'))

Now that we have successfully published a message (conforming to the schema specified above) we can pull messages from the subscription. Note that pulling will not take messages out of the queue (you can do it as many times you want). Messages will need to be acknowledged with subscriptions_ack after they have been successfully consumed in order to be successfully taken out of the queue.

Pulling messages will return a list containing a receivedMessages dataframe containing ackIds and a dataframe storing and message data and information.

msgs <- subscriptions_pull("vignette-sub")

tibble::glimpse(msgs)
List of 1
 $ receivedMessages:'data.frame':   1 obs. of  2 variables:
  ..$ ackId  : chr "PkVTRFAGFixdRkhRNxkIaFEOT14jPzUgKEUSC1MTUVx1A1MQaVwzdQdRDRlzejV1aQwRVAsUUHRfURsfWVxEjczJsS9QXWJxa1oQAwJHUH1YUxw"| __truncated__
  ..$ message:'data.frame': 1 obs. of  4 variables:
  .. ..$ data       : chr "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9"
  .. ..$ attributes :'data.frame':  1 obs. of  2 variables:
  .. ..$ messageId  : chr "3410320233787713"
  .. ..$ publishTime: chr "2021-11-09T09:06:31.171Z"

tibble::glimpse(msgs$receivedMessages$message)
Rows: 1
Columns: 4
$ data        <chr> "eyJuYW1lIjoiZm9yayIsInByaWNlIjo5OTl9"
$ attributes  <df[,2]> <data.frame[1 x 2]>
$ messageId   <chr> "3410320233787713"
$ publishTime <chr> "2021-11-09T09:06:31.171Z

Decoding incoming messages

In order to re-convert back message data into an usable format we’ll basically need to reverse the process that was used to encode them in the first place. Given that subscriptions_pull will return all messages in the queue, a good strategy might be to decode them all at once using lapply

decoded_msg <- lapply(msgs$receivedMessages$message$data, function(msg) {
    msg %>%
        base64decode() %>%
        rawToChar() %>%
        fromJSON(flatten = TRUE, simplifyDataFrame = TRUE) %>%
        as.data.frame()
}) %>% do.call(rbind, .)

decoded_msg
#   name price
# 1 fork   999

# ... Do something with the dataframe

# Acknowledge we have succesfully used it
subscriptions_ack(ack_ids = msgs$receivedMessages$ackId, subscription = "vignette-sub")
# [1] TRUE

This approach is quite cumbersome and helpers to facilitate this process might be included in the library later on when a general enough approach will be found.